http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index b089da8..7c170a7 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -19,7 +19,7 @@ package org.apache.spark.network.netty
 
 import java.nio.ByteBuffer
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.spark.Logging
 import org.apache.spark.network.BlockDataManager
@@ -55,7 +55,7 @@ class NettyBlockRpcServer(
       case openBlocks: OpenBlocks =>
         val blocks: Seq[ManagedBuffer] =
           openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
-        val streamId = streamManager.registerStream(blocks.iterator)
+        val streamId = streamManager.registerStream(blocks.iterator.asJava)
         logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
         responseContext.onSuccess(new StreamHandle(streamId, 
blocks.size).toByteArray)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index d650d5f..ff8aae9 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.network.netty
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.concurrent.{Future, Promise}
 
 import org.apache.spark.{SecurityManager, SparkConf}
@@ -58,7 +58,7 @@ class NettyBlockTransferService(conf: SparkConf, 
securityManager: SecurityManage
         securityManager.isSaslEncryptionEnabled()))
     }
     transportContext = new TransportContext(transportConf, rpcHandler)
-    clientFactory = 
transportContext.createClientFactory(clientBootstrap.toList)
+    clientFactory = 
transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
     server = createServer(serverBootstrap.toList)
     appId = conf.getAppId
     logInfo("Server created on " + server.getPort)
@@ -67,7 +67,7 @@ class NettyBlockTransferService(conf: SparkConf, 
securityManager: SecurityManage
   /** Creates and binds the TransportServer, possibly trying multiple ports. */
   private def createServer(bootstraps: List[TransportServerBootstrap]): 
TransportServer = {
     def startService(port: Int): (TransportServer, Int) = {
-      val server = transportContext.createServer(port, bootstraps)
+      val server = transportContext.createServer(port, bootstraps.asJava)
       (server, server.getPort)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala 
b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
index 1499da0..8d9ebad 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
@@ -23,7 +23,7 @@ import java.nio.channels._
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.LinkedList
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.util.control.NonFatal
 
@@ -145,7 +145,7 @@ abstract class Connection(val channel: SocketChannel, val 
selector: Selector,
   }
 
   def callOnExceptionCallbacks(e: Throwable) {
-    onExceptionCallbacks foreach {
+    onExceptionCallbacks.asScala.foreach {
       callback =>
         try {
           callback(this, e)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala 
b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
index 91b07ce..5afce75 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.partial
 
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.mutable.HashMap
 import scala.reflect.ClassTag
@@ -48,9 +48,9 @@ private[spark] class GroupedCountEvaluator[T : 
ClassTag](totalOutputs: Int, conf
     if (outputsMerged == totalOutputs) {
       val result = new JHashMap[T, BoundedDouble](sums.size)
       sums.foreach { case (key, sum) =>
-        result(key) = new BoundedDouble(sum, 1.0, sum, sum)
+        result.put(key, new BoundedDouble(sum, 1.0, sum, sum))
       }
-      result
+      result.asScala
     } else if (outputsMerged == 0) {
       new HashMap[T, BoundedDouble]
     } else {
@@ -64,9 +64,9 @@ private[spark] class GroupedCountEvaluator[T : 
ClassTag](totalOutputs: Int, conf
         val stdev = math.sqrt(variance)
         val low = mean - confFactor * stdev
         val high = mean + confFactor * stdev
-        result(key) = new BoundedDouble(mean, confidence, low, high)
+        result.put(key, new BoundedDouble(mean, confidence, low, high))
       }
-      result
+      result.asScala
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala 
b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
index af26c3d..a164040 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.partial
 
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.mutable.HashMap
 
@@ -55,9 +55,9 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: 
Int, confidence: Doub
       while (iter.hasNext) {
         val entry = iter.next()
         val mean = entry.getValue.mean
-        result(entry.getKey) = new BoundedDouble(mean, 1.0, mean, mean)
+        result.put(entry.getKey, new BoundedDouble(mean, 1.0, mean, mean))
       }
-      result
+      result.asScala
     } else if (outputsMerged == 0) {
       new HashMap[T, BoundedDouble]
     } else {
@@ -72,9 +72,9 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: 
Int, confidence: Doub
         val confFactor = studentTCacher.get(counter.count)
         val low = mean - confFactor * stdev
         val high = mean + confFactor * stdev
-        result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
+        result.put(entry.getKey, new BoundedDouble(mean, confidence, low, 
high))
       }
-      result
+      result.asScala
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala 
b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
index 442fb86..54a1bea 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.partial
 
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.mutable.HashMap
 
@@ -55,9 +55,9 @@ private[spark] class GroupedSumEvaluator[T](totalOutputs: 
Int, confidence: Doubl
       while (iter.hasNext) {
         val entry = iter.next()
         val sum = entry.getValue.sum
-        result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
+        result.put(entry.getKey, new BoundedDouble(sum, 1.0, sum, sum))
       }
-      result
+      result.asScala
     } else if (outputsMerged == 0) {
       new HashMap[T, BoundedDouble]
     } else {
@@ -80,9 +80,9 @@ private[spark] class GroupedSumEvaluator[T](totalOutputs: 
Int, confidence: Doubl
         val confFactor = studentTCacher.get(counter.count)
         val low = sumEstimate - confFactor * sumStdev
         val high = sumEstimate + confFactor * sumStdev
-        result(entry.getKey) = new BoundedDouble(sumEstimate, confidence, low, 
high)
+        result.put(entry.getKey, new BoundedDouble(sumEstimate, confidence, 
low, high))
       }
-      result
+      result.asScala
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 326fafb..4e5f2e8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -22,7 +22,7 @@ import java.text.SimpleDateFormat
 import java.util.{Date, HashMap => JHashMap}
 
 import scala.collection.{Map, mutable}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 import scala.util.DynamicVariable
@@ -312,14 +312,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     } : Iterator[JHashMap[K, V]]
 
     val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
-      m2.foreach { pair =>
+      m2.asScala.foreach { pair =>
         val old = m1.get(pair._1)
         m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
       }
       m1
     } : JHashMap[K, V]
 
-    self.mapPartitions(reducePartition).reduce(mergeMaps)
+    self.mapPartitions(reducePartition).reduce(mergeMaps).asScala
   }
 
   /** Alias for reduceByKeyLocally */

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 3bb9998..afbe566 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -23,7 +23,7 @@ import java.io.IOException
 import java.io.PrintWriter
 import java.util.StringTokenizer
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
@@ -72,7 +72,7 @@ private[spark] class PipedRDD[T: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[String] = {
-    val pb = new ProcessBuilder(command)
+    val pb = new ProcessBuilder(command.asJava)
     // Add the environmental variables to the process.
     val currentEnvVars = pb.environment()
     envVars.foreach { case (variable, value) => currentEnvVars.put(variable, 
value) }
@@ -81,7 +81,7 @@ private[spark] class PipedRDD[T: ClassTag](
     // so the user code can access the input filename
     if (split.isInstanceOf[HadoopPartition]) {
       val hadoopSplit = split.asInstanceOf[HadoopPartition]
-      currentEnvVars.putAll(hadoopSplit.getPipeEnvVars())
+      currentEnvVars.putAll(hadoopSplit.getPipeEnvVars().asJava)
     }
 
     // When spark.worker.separated.working.directory option is turned on, each

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index f7cb179..9a4fa30 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
 
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
@@ -125,7 +125,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: 
ClassTag, W: ClassTag](
     integrate(0, t => getSeq(t._1) += t._2)
     // the second dep is rdd2; remove all of its keys
     integrate(1, t => map.remove(t._1))
-    map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
+    map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
   }
 
   override def clearDependencies() {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index bac37bf..0e438ab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.immutable.Set
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
@@ -107,7 +107,7 @@ class InputFormatInfo(val configuration: Configuration, val 
inputFormatClazz: Cl
 
     val retval = new ArrayBuffer[SplitInfo]()
     val list = instance.getSplits(job)
-    for (split <- list) {
+    for (split <- list.asScala) {
       retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 174b732..5821afe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Logging
@@ -74,7 +74,7 @@ private[spark] class Pool(
     if (schedulableNameToSchedulable.containsKey(schedulableName)) {
       return schedulableNameToSchedulable.get(schedulableName)
     }
-    for (schedulable <- schedulableQueue) {
+    for (schedulable <- schedulableQueue.asScala) {
       val sched = schedulable.getSchedulableByName(schedulableName)
       if (sched != null) {
         return sched
@@ -84,12 +84,12 @@ private[spark] class Pool(
   }
 
   override def executorLost(executorId: String, host: String) {
-    schedulableQueue.foreach(_.executorLost(executorId, host))
+    schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
   }
 
   override def checkSpeculatableTasks(): Boolean = {
     var shouldRevive = false
-    for (schedulable <- schedulableQueue) {
+    for (schedulable <- schedulableQueue.asScala) {
       shouldRevive |= schedulable.checkSpeculatableTasks()
     }
     shouldRevive
@@ -98,7 +98,7 @@ private[spark] class Pool(
   override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
     var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
     val sortedSchedulableQueue =
-      schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
+      
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
     for (schedulable <- sortedSchedulableQueue) {
       sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d6e1e9e..452c32d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.util.concurrent.locks.ReentrantLock
 import java.util.{Collections, List => JList}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, HashSet}
 
 import com.google.common.collect.HashBiMap
@@ -233,7 +233,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
     stateLock.synchronized {
       val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-      for (offer <- offers) {
+      for (offer <- offers.asScala) {
         val offerAttributes = toAttributeMap(offer.getAttributesList)
         val meetsConstraints = 
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
         val slaveId = offer.getSlaveId.getValue
@@ -251,21 +251,21 @@ private[spark] class CoarseMesosSchedulerBackend(
           val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
           totalCoresAcquired += cpusToUse
           val taskId = newMesosTaskId()
-          taskIdToSlaveId(taskId) = slaveId
+          taskIdToSlaveId.put(taskId, slaveId)
           slaveIdsWithExecutors += slaveId
           coresByTaskId(taskId) = cpusToUse
           // Gather cpu resources from the available resources and use them in 
the task.
           val (remainingResources, cpuResourcesToUse) =
             partitionResources(offer.getResourcesList, "cpus", cpusToUse)
           val (_, memResourcesToUse) =
-            partitionResources(remainingResources, "mem", 
calculateTotalMemory(sc))
+            partitionResources(remainingResources.asJava, "mem", 
calculateTotalMemory(sc))
           val taskBuilder = MesosTaskInfo.newBuilder()
             .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
             .setSlaveId(offer.getSlaveId)
             .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, 
taskId))
             .setName("Task " + taskId)
-            .addAllResources(cpuResourcesToUse)
-            .addAllResources(memResourcesToUse)
+            .addAllResources(cpuResourcesToUse.asJava)
+            .addAllResources(memResourcesToUse.asJava)
 
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { 
image =>
             MesosSchedulerBackendUtil
@@ -314,9 +314,9 @@ private[spark] class CoarseMesosSchedulerBackend(
       }
 
       if (TaskState.isFinished(TaskState.fromMesos(state))) {
-        val slaveId = taskIdToSlaveId(taskId)
+        val slaveId = taskIdToSlaveId.get(taskId)
         slaveIdsWithExecutors -= slaveId
-        taskIdToSlaveId -= taskId
+        taskIdToSlaveId.remove(taskId)
         // Remove the cores we have remembered for this task, if it's in the 
hashmap
         for (cores <- coresByTaskId.get(taskId)) {
           totalCoresAcquired -= cores
@@ -361,7 +361,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     stateLock.synchronized {
       if (slaveIdsWithExecutors.contains(slaveId)) {
         val slaveIdToTaskId = taskIdToSlaveId.inverse()
-        if (slaveIdToTaskId.contains(slaveId)) {
+        if (slaveIdToTaskId.containsKey(slaveId)) {
           val taskId: Int = slaveIdToTaskId.get(slaveId)
           taskIdToSlaveId.remove(taskId)
           removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
@@ -411,7 +411,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     val slaveIdToTaskId = taskIdToSlaveId.inverse()
     for (executorId <- executorIds) {
       val slaveId = executorId.split("/")(0)
-      if (slaveIdToTaskId.contains(slaveId)) {
+      if (slaveIdToTaskId.containsKey(slaveId)) {
         mesosDriver.killTask(
           
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
         pendingRemovedSlaveIds += slaveId

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
index 3efc536..e0c547d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
@@ -129,6 +129,6 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
   }
 
   override def fetchAll[T](): Iterable[T] = {
-    zk.getChildren.forPath(WORKING_DIR).map(fetch[T]).flatten
+    zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1206f18..07da924 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.util.concurrent.locks.ReentrantLock
 import java.util.{Collections, Date, List => JList}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -350,7 +350,7 @@ private[spark] class MesosClusterScheduler(
         }
         // TODO: Page the status updates to avoid trying to reconcile
         // a large amount of tasks at once.
-        driver.reconcileTasks(statuses)
+        driver.reconcileTasks(statuses.toSeq.asJava)
       }
     }
   }
@@ -493,10 +493,10 @@ private[spark] class MesosClusterScheduler(
   }
 
   override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): 
Unit = {
-    val currentOffers = offers.map { o =>
+    val currentOffers = offers.asScala.map(o =>
       new ResourceOffer(
         o, getResource(o.getResourcesList, "cpus"), 
getResource(o.getResourcesList, "mem"))
-    }.toList
+    ).toList
     logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
     val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
     val currentTime = new Date()
@@ -521,10 +521,10 @@ private[spark] class MesosClusterScheduler(
         currentOffers,
         tasks)
     }
-    tasks.foreach { case (offerId, tasks) =>
-      driver.launchTasks(Collections.singleton(offerId), tasks)
+    tasks.foreach { case (offerId, taskInfos) =>
+      driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
     }
-    offers
+    offers.asScala
       .filter(o => !tasks.keySet.contains(o.getId))
       .foreach(o => driver.declineOffer(o.getId))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 5c20606..2e42405 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
 import java.io.File
 import java.util.{ArrayList => JArrayList, Collections, List => JList}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, HashSet}
 
 import org.apache.mesos.{Scheduler => MScheduler, _}
@@ -129,14 +129,12 @@ private[spark] class MesosSchedulerBackend(
     val (resourcesAfterCpu, usedCpuResources) =
       partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
     val (resourcesAfterMem, usedMemResources) =
-      partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc))
+      partitionResources(resourcesAfterCpu.asJava, "mem", 
calculateTotalMemory(sc))
 
-    builder.addAllResources(usedCpuResources)
-    builder.addAllResources(usedMemResources)
+    builder.addAllResources(usedCpuResources.asJava)
+    builder.addAllResources(usedMemResources.asJava)
 
-    sc.conf.getOption("spark.mesos.uris").map { uris =>
-      setupUris(uris, command)
-    }
+    sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
 
     val executorInfo = builder
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -148,7 +146,7 @@ private[spark] class MesosSchedulerBackend(
         .setupContainerBuilderDockerInfo(image, sc.conf, 
executorInfo.getContainerBuilder())
     }
 
-    (executorInfo.build(), resourcesAfterMem)
+    (executorInfo.build(), resourcesAfterMem.asJava)
   }
 
   /**
@@ -193,7 +191,7 @@ private[spark] class MesosSchedulerBackend(
 
   private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
     val builder = new StringBuilder
-    tasks.foreach { t =>
+    tasks.asScala.foreach { t =>
       builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
         .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
         .append("Task resources: ").append(t.getResourcesList).append("\n")
@@ -211,7 +209,7 @@ private[spark] class MesosSchedulerBackend(
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
     inClassLoader() {
       // Fail-fast on offers we know will be rejected
-      val (usableOffers, unUsableOffers) = offers.partition { o =>
+      val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
         val mem = getResource(o.getResourcesList, "mem")
         val cpus = getResource(o.getResourcesList, "cpus")
         val slaveId = o.getSlaveId.getValue
@@ -323,10 +321,10 @@ private[spark] class MesosSchedulerBackend(
       .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
       .setExecutor(executorInfo)
       .setName(task.name)
-      .addAllResources(cpuResources)
+      .addAllResources(cpuResources.asJava)
       .setData(MesosTaskLaunchData(task.serializedTask, 
task.attemptNumber).toByteString)
       .build()
-    (taskInfo, finalResources)
+    (taskInfo, finalResources.asJava)
   }
 
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 5b854aa..860c8e0 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
 import java.util.{List => JList}
 import java.util.concurrent.CountDownLatch
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
@@ -137,7 +137,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
   protected def getResource(res: JList[Resource], name: String): Double = {
     // A resource can have multiple values in the offer since it can either be 
from
     // a specific role or wildcard.
-    res.filter(_.getName == name).map(_.getScalar.getValue).sum
+    res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
   }
 
   protected def markRegistered(): Unit = {
@@ -169,7 +169,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
       amountToUse: Double): (List[Resource], List[Resource]) = {
     var remain = amountToUse
     var requestedResources = new ArrayBuffer[Resource]
-    val remainingResources = resources.map {
+    val remainingResources = resources.asScala.map {
       case r => {
         if (remain > 0 &&
           r.getType == Value.Type.SCALAR &&
@@ -214,7 +214,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
    * @return
    */
   protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, 
GeneratedMessage] = {
-    offerAttributes.map(attr => {
+    offerAttributes.asScala.map(attr => {
       val attrValue = attr.getType match {
         case Value.Type.SCALAR => attr.getScalar
         case Value.Type.RANGES => attr.getRanges
@@ -253,7 +253,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
             requiredValues.map(_.toLong).exists(offerRange.contains(_))
           case Some(offeredValue: Value.Set) =>
             // check if the specified required values is a subset of offered 
set
-            requiredValues.subsetOf(offeredValue.getItemList.toSet)
+            requiredValues.subsetOf(offeredValue.getItemList.asScala.toSet)
           case Some(textValue: Value.Text) =>
             // check if the specified value is equal, if multiple values are 
specified
             // we succeed if any of them match.
@@ -299,14 +299,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
       Map()
     } else {
       try {
-        Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
-          case (k, v) =>
-            if (v == null || v.isEmpty) {
-              (k, Set[String]())
-            } else {
-              (k, v.split(',').toSet)
-            }
-        }
+        splitter.split(constraintsVal).asScala.toMap.mapValues(v =>
+          if (v == null || v.isEmpty) {
+            Set[String]()
+          } else {
+            v.split(',').toSet
+          }
+        )
       } catch {
         case NonFatal(e) =>
           throw new IllegalArgumentException(s"Bad constraint string: 
$constraintsVal", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 0ff7562..048a938 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -21,6 +21,7 @@ import java.io.{EOFException, IOException, InputStream, 
OutputStream}
 import java.nio.ByteBuffer
 import javax.annotation.Nullable
 
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import com.esotericsoftware.kryo.{Kryo, KryoException}
@@ -373,16 +374,15 @@ private class JavaIterableWrapperSerializer
   override def read(kryo: Kryo, in: KryoInput, clz: 
Class[java.lang.Iterable[_]])
     : java.lang.Iterable[_] = {
     kryo.readClassAndObject(in) match {
-      case scalaIterable: Iterable[_] =>
-        scala.collection.JavaConversions.asJavaIterable(scalaIterable)
-      case javaIterable: java.lang.Iterable[_] =>
-        javaIterable
+      case scalaIterable: Iterable[_] => scalaIterable.asJava
+      case javaIterable: java.lang.Iterable[_] => javaIterable
     }
   }
 }
 
 private object JavaIterableWrapperSerializer extends Logging {
-  // The class returned by asJavaIterable 
(scala.collection.convert.Wrappers$IterableWrapper).
+  // The class returned by JavaConverters.asJava
+  // (scala.collection.convert.Wrappers$IterableWrapper).
   val wrapperClass =
     scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index f6a96d8..c057de9 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.spark.{Logging, SparkConf, SparkEnv}
 import org.apache.spark.executor.ShuffleWriteMetrics
@@ -210,11 +210,13 @@ private[spark] class FileShuffleBlockResolver(conf: 
SparkConf)
     shuffleStates.get(shuffleId) match {
       case Some(state) =>
         if (consolidateShuffleFiles) {
-          for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
+          for (fileGroup <- state.allFileGroups.asScala;
+               file <- fileGroup.files) {
             file.delete()
           }
         } else {
-          for (mapId <- state.completedMapTasks; reduceId <- 0 until 
state.numBuckets) {
+          for (mapId <- state.completedMapTasks.asScala;
+               reduceId <- 0 until state.numBuckets) {
             val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
             blockManager.diskBlockManager.getFile(blockId).delete()
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 6fec524..7db6035 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -21,7 +21,7 @@ import java.util.{HashMap => JHashMap}
 
 import scala.collection.immutable.HashSet
 import scala.collection.mutable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
 
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, 
ThreadSafeRpcEndpoint}
@@ -133,7 +133,7 @@ class BlockManagerMasterEndpoint(
 
     // Find all blocks for the given RDD, remove the block from both 
blockLocations and
     // the blockManagerInfo that is tracking the blocks.
-    val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId == 
rddId)
+    val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId 
== rddId)
     blocks.foreach { blockId =>
       val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
       bms.foreach(bm => 
blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
@@ -242,7 +242,7 @@ class BlockManagerMasterEndpoint(
 
   private def storageStatus: Array[StorageStatus] = {
     blockManagerInfo.map { case (blockManagerId, info) =>
-      new StorageStatus(blockManagerId, info.maxMem, info.blocks)
+      new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
     }.toArray
   }
 
@@ -292,7 +292,7 @@ class BlockManagerMasterEndpoint(
           if (askSlaves) {
             info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
           } else {
-            Future { info.blocks.keys.filter(filter).toSeq }
+            Future { info.blocks.asScala.keys.filter(filter).toSeq }
           }
         future
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 78e7ddc..1738258 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util
 
-import scala.collection.JavaConversions.mapAsJavaMap
+import scala.collection.JavaConverters._
 
 import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import akka.pattern.ask
@@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging {
     val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig
         .getOrElse(ConfigFactory.empty())
 
-    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, 
String])
+    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
       .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
       s"""
       |akka.daemonic = on

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala 
b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index a725767..13cb516 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -19,12 +19,11 @@ package org.apache.spark.util
 
 import java.util.concurrent.CopyOnWriteArrayList
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
 import org.apache.spark.Logging
-import org.apache.spark.scheduler.SparkListener
 
 /**
  * An event bus which posts events to its listeners.
@@ -46,7 +45,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
    * `postToAll` in the same thread for all events.
    */
   final def postToAll(event: E): Unit = {
-    // JavaConversions will create a JIterableWrapper if we use some Scala 
collection functions.
+    // JavaConverters can create a JIterableWrapper if we use asScala.
     // However, this method will be called frequently. To avoid the wrapper 
cost, here ewe use
     // Java Iterator directly.
     val iter = listeners.iterator
@@ -69,7 +68,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
 
   private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
     val c = implicitly[ClassTag[T]].runtimeClass
-    listeners.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
+    listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala 
b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
index 169489d..a1c3321 100644
--- a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala
@@ -21,8 +21,6 @@ import java.net.{URLClassLoader, URL}
 import java.util.Enumeration
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConversions._
-
 /**
  * URL class loader that exposes the `addURL` and `getURLs` methods in 
URLClassLoader.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala 
b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index 8de75ba..d7e5143 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -21,7 +21,8 @@ import java.util.Set
 import java.util.Map.Entry
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.{JavaConversions, mutable}
+import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.spark.Logging
 
@@ -50,8 +51,7 @@ private[spark] class TimeStampedHashMap[A, 
B](updateTimeStampOnGet: Boolean = fa
   }
 
   def iterator: Iterator[(A, B)] = {
-    val jIterator = getEntrySet.iterator
-    JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, 
kv.getValue.value))
+    getEntrySet.iterator.asScala.map(kv => (kv.getKey, kv.getValue.value))
   }
 
   def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet
@@ -90,9 +90,7 @@ private[spark] class TimeStampedHashMap[A, 
B](updateTimeStampOnGet: Boolean = fa
   }
 
   override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = {
-    JavaConversions.mapAsScalaConcurrentMap(internalMap)
-      .map { case (k, TimeStampedValue(v, t)) => (k, v) }
-      .filter(p)
+    internalMap.asScala.map { case (k, TimeStampedValue(v, t)) => (k, v) 
}.filter(p)
   }
 
   override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]()

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala 
b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
index 7cd8f28..65efeb1 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
 
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
 import scala.collection.mutable.Set
 
 private[spark] class TimeStampedHashSet[A] extends Set[A] {
@@ -31,7 +31,7 @@ private[spark] class TimeStampedHashSet[A] extends Set[A] {
 
   def iterator: Iterator[A] = {
     val jIterator = internalMap.entrySet().iterator()
-    JavaConversions.asScalaIterator(jIterator).map(_.getKey)
+    jIterator.asScala.map(_.getKey)
   }
 
   override def + (elem: A): Set[A] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8313312..2bab4af 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -25,7 +25,7 @@ import java.util.{Properties, Locale, Random, UUID}
 import java.util.concurrent._
 import javax.net.ssl.HttpsURLConnection
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
@@ -748,12 +748,12 @@ private[spark] object Utils extends Logging {
         // getNetworkInterfaces returns ifs in reverse order compared to 
ifconfig output order
         // on unix-like system. On windows, it returns in index order.
         // It's more proper to pick ip address following system output order.
-        val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList
+        val activeNetworkIFs = 
NetworkInterface.getNetworkInterfaces.asScala.toSeq
         val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else 
activeNetworkIFs.reverse
 
         for (ni <- reOrderedNetworkIFs) {
-          val addresses = ni.getInetAddresses.toList
-            .filterNot(addr => addr.isLinkLocalAddress || 
addr.isLoopbackAddress)
+          val addresses = ni.getInetAddresses.asScala
+            .filterNot(addr => addr.isLinkLocalAddress || 
addr.isLoopbackAddress).toSeq
           if (addresses.nonEmpty) {
             val addr = 
addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
             // because of Inet6Address.toHostName may add interface at the end 
if it knows about it
@@ -1498,10 +1498,8 @@ private[spark] object Utils extends Logging {
     * properties which have been set explicitly, as well as those for which 
only a default value
     * has been defined. */
   def getSystemProperties: Map[String, String] = {
-    val sysProps = for (key <- System.getProperties.stringPropertyNames()) 
yield
-      (key, System.getProperty(key))
-
-    sysProps.toMap
+    System.getProperties.stringPropertyNames().asScala
+      .map(key => (key, System.getProperty(key))).toMap
   }
 
   /**
@@ -1812,7 +1810,8 @@ private[spark] object Utils extends Logging {
     try {
       val properties = new Properties()
       properties.load(inReader)
-      properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
+      properties.stringPropertyNames().asScala.map(
+        k => (k, properties.getProperty(k).trim)).toMap
     } catch {
       case e: IOException =>
         throw new SparkException(s"Failed when loading Spark properties from 
$filename", e)
@@ -1941,7 +1940,8 @@ private[spark] object Utils extends Logging {
           return true
         }
         isBindCollision(e.getCause)
-      case e: MultiException => e.getThrowables.exists(isBindCollision)
+      case e: MultiException =>
+        e.getThrowables.asScala.exists(isBindCollision)
       case e: Exception => isBindCollision(e.getCause)
       case _ => false
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
index bdbca00..4939b60 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util.collection
 
-import scala.collection.JavaConversions.{collectionAsScalaIterable, 
asJavaIterator}
+import scala.collection.JavaConverters._
 
 import com.google.common.collect.{Ordering => GuavaOrdering}
 
@@ -34,6 +34,6 @@ private[spark] object Utils {
     val ordering = new GuavaOrdering[T] {
       override def compare(l: T, r: T): Int = ord.compare(l, r)
     }
-    collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), 
num)).iterator
+    ordering.leastOf(input.asJava, num).iterator.asScala
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java 
b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index ffe4b4b..ebd3d61 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -24,10 +24,10 @@ import java.net.URI;
 import java.util.*;
 import java.util.concurrent.*;
 
-import scala.collection.JavaConversions;
 import scala.Tuple2;
 import scala.Tuple3;
 import scala.Tuple4;
+import scala.collection.JavaConverters;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -1473,7 +1473,9 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(expected, results);
 
     Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
-        combinedRDD.rdd(), 
JavaConversions.asScalaBuffer(Lists.<RDD<?>>newArrayList()));
+        combinedRDD.rdd(),
+        JavaConverters.collectionAsScalaIterableConverter(
+            Collections.<RDD<?>>emptyList()).asScala().toSeq());
     combinedRDD = originalRDD.keyBy(keyFunction)
         .combineByKey(
              createCombinerFunction,

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 90cb7da..ff9a92c 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.util.concurrent.{TimeUnit, Executors}
 
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.{Try, Random}
@@ -148,7 +149,6 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
   }
 
   test("Thread safeness - SPARK-5425") {
-    import scala.collection.JavaConversions._
     val executor = Executors.newSingleThreadScheduledExecutor()
     val sf = executor.scheduleAtFixedRate(new Runnable {
       override def run(): Unit =
@@ -163,8 +163,9 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
       }
     } finally {
       executor.shutdownNow()
-      for (key <- System.getProperties.stringPropertyNames() if 
key.startsWith("spark.5425."))
-        System.getProperties.remove(key)
+      val sysProps = System.getProperties
+      for (key <- sysProps.stringPropertyNames().asScala if 
key.startsWith("spark.5425."))
+        sysProps.remove(key)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index cbd2aee..86eb41d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy
 
 import java.net.URL
 
-import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.io.Source
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 47a6408..1ed4bae 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -21,14 +21,14 @@ import java.io.{PrintStream, OutputStream, File}
 import java.net.URI
 import java.util.jar.Attributes.Name
 import java.util.jar.{JarFile, Manifest}
-import java.util.zip.{ZipEntry, ZipFile}
+import java.util.zip.ZipFile
 
-import org.scalatest.BeforeAndAfterEach
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.io.Files
 import org.apache.commons.io.FileUtils
+import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.api.r.RUtils
@@ -142,7 +142,7 @@ class RPackageUtilsSuite extends SparkFunSuite with 
BeforeAndAfterEach {
       IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
       val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
       assert(finalZip.exists())
-      val entries = new ZipFile(finalZip).entries().toSeq.map(_.getName)
+      val entries = new 
ZipFile(finalZip).entries().asScala.map(_.getName).toSeq
       assert(entries.contains("/test.R"))
       assert(entries.contains("/SparkR/abc.R"))
       assert(entries.contains("/SparkR/DESCRIPTION"))

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index bed6f3e..98664dc 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -19,8 +19,6 @@ package org.apache.spark.deploy.worker
 
 import java.io.File
 
-import scala.collection.JavaConversions._
-
 import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 
@@ -36,6 +34,7 @@ class ExecutorRunnerTest extends SparkFunSuite {
       ExecutorState.RUNNING)
     val builder = CommandUtils.buildProcessBuilder(
       appDesc.command, new SecurityManager(conf), 512, sparkHome, 
er.substituteVariables)
-    assert(builder.command().last === appId)
+    val builderCommand = builder.command()
+    assert(builderCommand.get(builderCommand.size() - 1) === appId)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 730535e..a9652d7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
 import java.util.concurrent.Semaphore
 
 import scala.collection.mutable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.scalatest.Matchers
 
@@ -365,10 +365,9 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
       .set("spark.extraListeners", 
classOf[ListenerThatAcceptsSparkConf].getName + "," +
         classOf[BasicJobCounter].getName)
     sc = new SparkContext(conf)
-    sc.listenerBus.listeners.collect { case x: BasicJobCounter => x}.size 
should be (1)
-    sc.listenerBus.listeners.collect {
-      case x: ListenerThatAcceptsSparkConf => x
-    }.size should be (1)
+    sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) 
should be (1)
+    sc.listenerBus.listeners.asScala
+      .count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index 5ed30f6..319b317 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -18,10 +18,11 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.nio.ByteBuffer
-import java.util
+import java.util.Arrays
+import java.util.Collection
 import java.util.Collections
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -61,7 +62,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
 
     val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, 
"master")
 
-    val resources = List(
+    val resources = Arrays.asList(
       mesosSchedulerBackend.createResource("cpus", 4),
       mesosSchedulerBackend.createResource("mem", 1024))
     // uri is null.
@@ -98,7 +99,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
     val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
 
     val (execInfo, _) = backend.createExecutorInfo(
-      List(backend.createResource("cpus", 4)), "mockExecutor")
+      Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
     assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
     val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
     assert(portmaps.get(0).getHostPort.equals(80))
@@ -179,7 +180,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
     
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
     when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
 
-    val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+    val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
     when(
       driver.launchTasks(
         Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
@@ -279,7 +280,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
     
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
     when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
 
-    val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+    val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
     when(
       driver.launchTasks(
         Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
@@ -304,7 +305,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext wi
     assert(cpusDev.getName.equals("cpus"))
     assert(cpusDev.getScalar.getValue.equals(1.0))
     assert(cpusDev.getRole.equals("dev"))
-    val executorResources = taskInfo.getExecutor.getResourcesList
+    val executorResources = taskInfo.getExecutor.getResourcesList.asScala
     assert(executorResources.exists { r =>
       r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && 
r.getRole.equals("prod")
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 23a1fdb..8d1c9d1 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.serializer
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
@@ -173,7 +174,7 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
   test("asJavaIterable") {
     // Serialize a collection wrapped by asJavaIterable
     val ser = new KryoSerializer(conf).newInstance()
-    val a = 
ser.serialize(scala.collection.convert.WrapAsJava.asJavaIterable(Seq(12345)))
+    val a = ser.serialize(Seq(12345).asJava)
     val b = ser.deserialize[java.lang.Iterable[Int]](a)
     assert(b.iterator().next() === 12345)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 69888b2..22e30ec 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -21,7 +21,6 @@ import java.net.{HttpURLConnection, URL}
 import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
 
 import scala.io.Source
-import scala.collection.JavaConversions._
 import scala.xml.Node
 
 import com.gargoylesoftware.htmlunit.DefaultCssErrorHandler
@@ -341,15 +340,15 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers with B
         // The completed jobs table should have two rows. The first row will 
be the most recent job:
         val firstRow = find(cssSelector("tbody tr")).get.underlying
         val firstRowColumns = firstRow.findElements(By.tagName("td"))
-        firstRowColumns(0).getText should be ("1")
-        firstRowColumns(4).getText should be ("1/1 (2 skipped)")
-        firstRowColumns(5).getText should be ("8/8 (16 skipped)")
+        firstRowColumns.get(0).getText should be ("1")
+        firstRowColumns.get(4).getText should be ("1/1 (2 skipped)")
+        firstRowColumns.get(5).getText should be ("8/8 (16 skipped)")
         // The second row is the first run of the job, where nothing was 
skipped:
         val secondRow = findAll(cssSelector("tbody tr")).toSeq(1).underlying
         val secondRowColumns = secondRow.findElements(By.tagName("td"))
-        secondRowColumns(0).getText should be ("0")
-        secondRowColumns(4).getText should be ("3/3")
-        secondRowColumns(5).getText should be ("24/24")
+        secondRowColumns.get(0).getText should be ("0")
+        secondRowColumns.get(4).getText should be ("3/3")
+        secondRowColumns.get(5).getText should be ("24/24")
       }
     }
   }
@@ -502,8 +501,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser 
with Matchers with B
         for {
           (row, idx) <- rows.zipWithIndex
           columns = row.findElements(By.tagName("td"))
-          id = columns(0).getText()
-          description = columns(1).getText()
+          id = columns.get(0).getText()
+          description = columns.get(1).getText()
         } {
           id should be (expJobInfo(idx)._1)
           description should include (expJobInfo(idx)._2)
@@ -547,8 +546,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser 
with Matchers with B
         for {
           (row, idx) <- rows.zipWithIndex
           columns = row.findElements(By.tagName("td"))
-          id = columns(0).getText()
-          description = columns(1).getText()
+          id = columns.get(0).getText()
+          description = columns.get(1).getText()
         } {
           id should be (expStageInfo(idx)._1)
           description should include (expStageInfo(idx)._2)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
index 36832f5..fa07c1e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -19,10 +19,7 @@
 package org.apache.spark.examples
 
 import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
-import scala.collection.immutable.Map
+import java.util.Collections
 
 import org.apache.cassandra.hadoop.ConfigHelper
 import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
@@ -32,7 +29,6 @@ import org.apache.cassandra.utils.ByteBufferUtil
 import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.SparkContext._
 
 
 /*
@@ -121,12 +117,9 @@ object CassandraCQLTest {
 
     val casoutputCF = aggregatedRDD.map {
       case (productId, saleCount) => {
-        val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
-        val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
-        var outColFamVal = new ListBuffer[ByteBuffer]
-        outColFamVal += ByteBufferUtil.bytes(saleCount)
-        val outVal: java.util.List[ByteBuffer] = outColFamVal
-       (outKey, outVal)
+        val outKey = Collections.singletonMap("prod_id", 
ByteBufferUtil.bytes(productId))
+        val outVal = Collections.singletonList(ByteBufferUtil.bytes(saleCount))
+        (outKey, outVal)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
index 96ef3e1..2e56d24 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -19,10 +19,9 @@
 package org.apache.spark.examples
 
 import java.nio.ByteBuffer
+import java.util.Arrays
 import java.util.SortedMap
 
-import scala.collection.JavaConversions._
-
 import org.apache.cassandra.db.IColumn
 import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
 import org.apache.cassandra.hadoop.ConfigHelper
@@ -32,7 +31,6 @@ import org.apache.cassandra.utils.ByteBufferUtil
 import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.SparkContext._
 
 /*
  * This example demonstrates using Spark with Cassandra with the New Hadoop 
API and Cassandra
@@ -118,7 +116,7 @@ object CassandraTest {
 
         val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + 
System.currentTimeMillis)
 
-        val mutations: java.util.List[Mutation] = new Mutation() :: new 
Mutation() :: Nil
+        val mutations = Arrays.asList(new Mutation(), new Mutation())
         mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
         mutations.get(0).column_or_supercolumn.setColumn(colWord)
         mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
index c42df2b..bec61f3 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
@@ -18,7 +18,7 @@
 // scalastyle:off println
 package org.apache.spark.examples
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.spark.util.Utils
 
@@ -36,10 +36,10 @@ object DriverSubmissionTest {
     val properties = Utils.getSystemProperties
 
     println("Environment variables containing SPARK_TEST:")
-    env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)
+    env.asScala.filter { case (k, _) => 
k.contains("SPARK_TEST")}.foreach(println)
 
     println("System properties containing spark.test:")
-    properties.filter{case (k, v) => 
k.toString.contains("spark.test")}.foreach(println)
+    properties.filter { case (k, _) => k.toString.contains("spark.test") 
}.foreach(println)
 
     for (i <- 1 until numSecondsToSleep) {
       println(s"Alive for $i out of $numSecondsToSleep seconds")

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
index 3ebb112..805184e 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
@@ -19,7 +19,7 @@ package org.apache.spark.examples.pythonconverters
 
 import java.util.{Collection => JCollection, Map => JMap}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.avro.generic.{GenericFixed, IndexedRecord}
 import org.apache.avro.mapred.AvroWrapper
@@ -58,7 +58,7 @@ object AvroConversionUtil extends Serializable {
     val map = new java.util.HashMap[String, Any]
     obj match {
       case record: IndexedRecord =>
-        record.getSchema.getFields.zipWithIndex.foreach { case (f, i) =>
+        record.getSchema.getFields.asScala.zipWithIndex.foreach { case (f, i) 
=>
           map.put(f.name, fromAvro(record.get(i), f.schema))
         }
       case other => throw new SparkException(
@@ -68,9 +68,9 @@ object AvroConversionUtil extends Serializable {
   }
 
   def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = {
-    obj.asInstanceOf[JMap[_, _]].map { case (key, value) =>
+    obj.asInstanceOf[JMap[_, _]].asScala.map { case (key, value) =>
       (key.toString, fromAvro(value, schema.getValueType))
-    }
+    }.asJava
   }
 
   def unpackFixed(obj: Any, schema: Schema): Array[Byte] = {
@@ -91,17 +91,17 @@ object AvroConversionUtil extends Serializable {
 
   def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match {
     case c: JCollection[_] =>
-      c.map(fromAvro(_, schema.getElementType))
+      c.asScala.map(fromAvro(_, schema.getElementType)).toSeq.asJava
     case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
-      arr.toSeq
+      arr.toSeq.asJava.asInstanceOf[JCollection[Any]]
     case arr: Array[_] =>
-      arr.map(fromAvro(_, schema.getElementType)).toSeq
+      arr.map(fromAvro(_, schema.getElementType)).toSeq.asJava
     case other => throw new SparkException(
       s"Unknown ARRAY type ${other.getClass.getName}")
   }
 
   def unpackUnion(obj: Any, schema: Schema): Any = {
-    schema.getTypes.toList match {
+    schema.getTypes.asScala.toList match {
       case List(s) => fromAvro(obj, s)
       case List(n, s) if n.getType == NULL => fromAvro(obj, s)
       case List(s, n) if n.getType == NULL => fromAvro(obj, s)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
index 83feb57..00ce47a 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.examples.pythonconverters
 
-import org.apache.spark.api.python.Converter
 import java.nio.ByteBuffer
+
+import scala.collection.JavaConverters._
+
 import org.apache.cassandra.utils.ByteBufferUtil
-import collection.JavaConversions._
 
+import org.apache.spark.api.python.Converter
 
 /**
  * Implementation of [[org.apache.spark.api.python.Converter]] that converts 
Cassandra
@@ -30,7 +32,7 @@ import collection.JavaConversions._
 class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, 
Int]] {
   override def convert(obj: Any): java.util.Map[String, Int] = {
     val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
-    mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
+    result.asScala.mapValues(ByteBufferUtil.toInt).asJava
   }
 }
 
@@ -41,7 +43,7 @@ class CassandraCQLKeyConverter extends Converter[Any, 
java.util.Map[String, Int]
 class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, 
String]] {
   override def convert(obj: Any): java.util.Map[String, String] = {
     val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
-    mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
+    result.asScala.mapValues(ByteBufferUtil.string).asJava
   }
 }
 
@@ -52,7 +54,7 @@ class CassandraCQLValueConverter extends Converter[Any, 
java.util.Map[String, St
 class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, 
ByteBuffer]] {
   override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
     val input = obj.asInstanceOf[java.util.Map[String, Int]]
-    mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i)))
+    input.asScala.mapValues(ByteBufferUtil.bytes).asJava
   }
 }
 
@@ -63,6 +65,6 @@ class ToCassandraCQLKeyConverter extends Converter[Any, 
java.util.Map[String, By
 class ToCassandraCQLValueConverter extends Converter[Any, 
java.util.List[ByteBuffer]] {
   override def convert(obj: Any): java.util.List[ByteBuffer] = {
     val input = obj.asInstanceOf[java.util.List[String]]
-    seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s)))
+    input.asScala.map(ByteBufferUtil.bytes).asJava
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
index 90d48a6..0a25ee7 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.examples.pythonconverters
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.util.parsing.json.JSONObject
 
 import org.apache.spark.api.python.Converter
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.CellUtil
  */
 class HBaseResultToStringConverter extends Converter[Any, String] {
   override def convert(obj: Any): String = {
-    import collection.JavaConverters._
     val result = obj.asInstanceOf[Result]
     val output = result.listCells.asScala.map(cell =>
         Map(
@@ -77,7 +76,7 @@ class StringToImmutableBytesWritableConverter extends 
Converter[Any, ImmutableBy
  */
 class StringListToPutConverter extends Converter[Any, Put] {
   override def convert(obj: Any): Put = {
-    val output = 
obj.asInstanceOf[java.util.ArrayList[String]].map(Bytes.toBytes(_)).toArray
+    val output = 
obj.asInstanceOf[java.util.ArrayList[String]].asScala.map(Bytes.toBytes).toArray
     val put = new Put(output(0))
     put.add(output(1), output(2), output(3))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
 
b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
index fa43629..d265470 100644
--- 
a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ 
b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -20,7 +20,7 @@ import java.net.InetSocketAddress
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{TimeUnit, CountDownLatch, Executors}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
@@ -166,7 +166,7 @@ class SparkSinkSuite extends FunSuite {
     channelContext.put("capacity", channelCapacity.toString)
     channelContext.put("transactionCapacity", 1000.toString)
     channelContext.put("keep-alive", 0.toString)
-    channelContext.putAll(overrides)
+    channelContext.putAll(overrides.asJava)
     channel.setName(scala.util.Random.nextString(10))
     channel.configure(channelContext)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
index 65c49c1..48df27b 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.flume
 
 import java.io.{ObjectOutput, ObjectInput}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.spark.util.Utils
 import org.apache.spark.Logging
@@ -60,7 +60,7 @@ private[streaming] object EventTransformer extends Logging {
     out.write(body)
     val numHeaders = headers.size()
     out.writeInt(numHeaders)
-    for ((k, v) <- headers) {
+    for ((k, v) <- headers.asScala) {
       val keyBuff = Utils.serialize(k.toString)
       out.writeInt(keyBuff.length)
       out.write(keyBuff)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
index 88cc2aa..b9d4e76 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.streaming.flume
 
-import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.base.Throwables
@@ -155,7 +154,7 @@ private[flume] class FlumeBatchFetcher(receiver: 
FlumePollingReceiver) extends R
     val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
     var j = 0
     while (j < events.size()) {
-      val event = events(j)
+      val event = events.get(j)
       val sparkFlumeEvent = new SparkFlumeEvent()
       sparkFlumeEvent.event.setBody(event.getBody)
       sparkFlumeEvent.event.setHeaders(event.getHeaders)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 1e32a36..2bf99cb 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -22,7 +22,7 @@ import java.io.{ObjectInput, ObjectOutput, Externalizable}
 import java.nio.ByteBuffer
 import java.util.concurrent.Executors
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.flume.source.avro.AvroSourceProtocol
@@ -99,7 +99,7 @@ class SparkFlumeEvent() extends Externalizable {
 
     val numHeaders = event.getHeaders.size()
     out.writeInt(numHeaders)
-    for ((k, v) <- event.getHeaders) {
+    for ((k, v) <- event.getHeaders.asScala) {
       val keyBuff = Utils.serialize(k.toString)
       out.writeInt(keyBuff.length)
       out.write(keyBuff)
@@ -127,8 +127,7 @@ class FlumeEventServer(receiver : FlumeReceiver) extends 
AvroSourceProtocol {
   }
 
   override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = 
{
-    events.foreach (event =>
-      receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
+    events.asScala.foreach(event => 
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
     Status.OK
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 583e7dc..0bc4620 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.flume
 import java.net.InetSocketAddress
 import java.util.concurrent.{LinkedBlockingQueue, Executors}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -94,9 +94,7 @@ private[streaming] class FlumePollingReceiver(
   override def onStop(): Unit = {
     logInfo("Shutting down Flume Polling Receiver")
     receiverExecutor.shutdownNow()
-    connections.foreach(connection => {
-      connection.transceiver.close()
-    })
+    connections.asScala.foreach(_.transceiver.close())
     channelFactory.releaseExternalResources()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
index 9d9c3b1..70018c8 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -19,9 +19,9 @@ package org.apache.spark.streaming.flume
 
 import java.net.{InetSocketAddress, ServerSocket}
 import java.nio.ByteBuffer
-import java.util.{List => JList}
+import java.util.Collections
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import com.google.common.base.Charsets.UTF_8
 import org.apache.avro.ipc.NettyTransceiver
@@ -59,13 +59,13 @@ private[flume] class FlumeTestUtils {
   }
 
   /** Send data to the flume receiver */
-  def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
+  def writeInput(input: Seq[String], enableCompression: Boolean): Unit = {
     val testAddress = new InetSocketAddress("localhost", testPort)
 
     val inputEvents = input.map { item =>
       val event = new AvroFlumeEvent
       event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
-      event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+      event.setHeaders(Collections.singletonMap("test", "header"))
       event
     }
 
@@ -88,7 +88,7 @@ private[flume] class FlumeTestUtils {
     }
 
     // Send data
-    val status = client.appendBatch(inputEvents.toList)
+    val status = client.appendBatch(inputEvents.asJava)
     if (status != avro.Status.OK) {
       throw new AssertionError("Sent events unsuccessfully")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index a65a9b9..c719b80 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -21,7 +21,7 @@ import java.net.InetSocketAddress
 import java.io.{DataOutputStream, ByteArrayOutputStream}
 import java.util.{List => JList, Map => JMap}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.spark.api.java.function.PairFunction
 import org.apache.spark.api.python.PythonRDD
@@ -268,8 +268,8 @@ private[flume] class FlumeUtilsPythonHelper {
       maxBatchSize: Int,
       parallelism: Int
     ): JavaPairDStream[Array[Byte], Array[Byte]] = {
-    assert(hosts.length == ports.length)
-    val addresses = hosts.zip(ports).map {
+    assert(hosts.size() == ports.size())
+    val addresses = hosts.asScala.zip(ports.asScala).map {
       case (host, port) => new InetSocketAddress(host, port)
     }
     val dstream = FlumeUtils.createPollingStream(
@@ -286,7 +286,7 @@ private object FlumeUtilsPythonHelper {
     val output = new DataOutputStream(byteStream)
     try {
       output.writeInt(map.size)
-      map.foreach { kv =>
+      map.asScala.foreach { kv =>
         PythonRDD.writeUTF(kv._1.toString, output)
         PythonRDD.writeUTF(kv._2.toString, output)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to