[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to 
JavaConverters

Replace `JavaConversions` implicits with `JavaConverters`

Most occurrences I've seen so far are necessary conversions; a few have been 
avoidable. None are in critical code as far as I see, yet.

Author: Sean Owen <so...@cloudera.com>

Closes #8033 from srowen/SPARK-9613.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69c9c177
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69c9c177
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69c9c177

Branch: refs/heads/master
Commit: 69c9c177160e32a2fbc9b36ecc52156077fca6fc
Parents: 7f1e507
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Aug 25 12:33:13 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Aug 25 12:33:13 2015 +0100

----------------------------------------------------------------------
 .../shuffle/unsafe/UnsafeShuffleWriter.java     |   4 +-
 .../org/apache/spark/MapOutputTracker.scala     |   4 +-
 .../scala/org/apache/spark/SSLOptions.scala     |  11 +-
 .../scala/org/apache/spark/SparkContext.scala   |   4 +-
 .../main/scala/org/apache/spark/TestUtils.scala |   9 +-
 .../apache/spark/api/java/JavaHadoopRDD.scala   |   4 +-
 .../spark/api/java/JavaNewHadoopRDD.scala       |   4 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |  19 ++--
 .../org/apache/spark/api/java/JavaRDDLike.scala |  75 +++++--------
 .../spark/api/java/JavaSparkContext.scala       |  20 ++--
 .../spark/api/python/PythonHadoopUtil.scala     |  28 ++---
 .../org/apache/spark/api/python/PythonRDD.scala |  26 ++---
 .../apache/spark/api/python/PythonUtils.scala   |  15 ++-
 .../spark/api/python/PythonWorkerFactory.scala  |  11 +-
 .../org/apache/spark/api/python/SerDeUtil.scala |   3 +-
 .../WriteInputFormatTestDataGenerator.scala     |   8 +-
 .../scala/org/apache/spark/api/r/RRDD.scala     |  13 ++-
 .../scala/org/apache/spark/api/r/RUtils.scala   |   5 +-
 .../scala/org/apache/spark/api/r/SerDe.scala    |   4 +-
 .../spark/broadcast/TorrentBroadcast.scala      |   4 +-
 .../spark/deploy/ExternalShuffleService.scala   |   8 +-
 .../org/apache/spark/deploy/PythonRunner.scala  |   4 +-
 .../org/apache/spark/deploy/RPackageUtils.scala |   4 +-
 .../scala/org/apache/spark/deploy/RRunner.scala |   4 +-
 .../apache/spark/deploy/SparkCuratorUtil.scala  |   4 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  19 ++--
 .../spark/deploy/SparkSubmitArguments.scala     |   6 +-
 .../master/ZooKeeperPersistenceEngine.scala     |   6 +-
 .../spark/deploy/worker/CommandUtils.scala      |   5 +-
 .../spark/deploy/worker/DriverRunner.scala      |   8 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |   7 +-
 .../org/apache/spark/deploy/worker/Worker.scala |   1 -
 .../org/apache/spark/executor/Executor.scala    |   6 +-
 .../apache/spark/executor/ExecutorSource.scala  |   4 +-
 .../spark/executor/MesosExecutorBackend.scala   |   6 +-
 .../apache/spark/input/PortableDataStream.scala |  11 +-
 .../spark/input/WholeTextFileInputFormat.scala  |   8 +-
 .../spark/launcher/WorkerCommandBuilder.scala   |   4 +-
 .../apache/spark/metrics/MetricsConfig.scala    |  22 ++--
 .../network/netty/NettyBlockRpcServer.scala     |   4 +-
 .../netty/NettyBlockTransferService.scala       |   6 +-
 .../apache/spark/network/nio/Connection.scala   |   4 +-
 .../spark/partial/GroupedCountEvaluator.scala   |  10 +-
 .../spark/partial/GroupedMeanEvaluator.scala    |  10 +-
 .../spark/partial/GroupedSumEvaluator.scala     |  10 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   6 +-
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |   6 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |   4 +-
 .../spark/scheduler/InputFormatInfo.scala       |   4 +-
 .../scala/org/apache/spark/scheduler/Pool.scala |  10 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  20 ++--
 .../mesos/MesosClusterPersistenceEngine.scala   |   4 +-
 .../cluster/mesos/MesosClusterScheduler.scala   |  14 +--
 .../cluster/mesos/MesosSchedulerBackend.scala   |  22 ++--
 .../cluster/mesos/MesosSchedulerUtils.scala     |  25 ++---
 .../spark/serializer/KryoSerializer.scala       |  10 +-
 .../shuffle/FileShuffleBlockResolver.scala      |   8 +-
 .../storage/BlockManagerMasterEndpoint.scala    |   8 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |   4 +-
 .../org/apache/spark/util/ListenerBus.scala     |   7 +-
 .../spark/util/MutableURLClassLoader.scala      |   2 -
 .../apache/spark/util/TimeStampedHashMap.scala  |  10 +-
 .../apache/spark/util/TimeStampedHashSet.scala  |   4 +-
 .../scala/org/apache/spark/util/Utils.scala     |  20 ++--
 .../apache/spark/util/collection/Utils.scala    |   4 +-
 .../java/org/apache/spark/JavaAPISuite.java     |   6 +-
 .../scala/org/apache/spark/SparkConfSuite.scala |   7 +-
 .../spark/deploy/LogUrlsStandaloneSuite.scala   |   1 -
 .../spark/deploy/RPackageUtilsSuite.scala       |   8 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |   5 +-
 .../spark/scheduler/SparkListenerSuite.scala    |   9 +-
 .../mesos/MesosSchedulerBackendSuite.scala      |  15 +--
 .../spark/serializer/KryoSerializerSuite.scala  |   3 +-
 .../org/apache/spark/ui/UISeleniumSuite.scala   |  21 ++--
 .../spark/examples/CassandraCQLTest.scala       |  15 +--
 .../apache/spark/examples/CassandraTest.scala   |   6 +-
 .../spark/examples/DriverSubmissionTest.scala   |   6 +-
 .../pythonconverters/AvroConverters.scala       |  16 +--
 .../pythonconverters/CassandraConverters.scala  |  14 ++-
 .../pythonconverters/HBaseConverters.scala      |   5 +-
 .../streaming/flume/sink/SparkSinkSuite.scala   |   4 +-
 .../streaming/flume/EventTransformer.scala      |   4 +-
 .../streaming/flume/FlumeBatchFetcher.scala     |   3 +-
 .../streaming/flume/FlumeInputDStream.scala     |   7 +-
 .../flume/FlumePollingInputDStream.scala        |   6 +-
 .../spark/streaming/flume/FlumeTestUtils.scala  |  10 +-
 .../spark/streaming/flume/FlumeUtils.scala      |   8 +-
 .../streaming/flume/PollingFlumeTestUtils.scala |  16 ++-
 .../flume/FlumePollingStreamSuite.scala         |   8 +-
 .../streaming/flume/FlumeStreamSuite.scala      |   2 +-
 .../spark/streaming/kafka/KafkaTestUtils.scala  |   4 +-
 .../spark/streaming/kafka/KafkaUtils.scala      |  35 +++---
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |  15 ++-
 .../kinesis/KinesisBackedBlockRDD.scala         |   4 +-
 .../streaming/kinesis/KinesisReceiver.scala     |   4 +-
 .../streaming/kinesis/KinesisTestUtils.scala    |   3 +-
 .../kinesis/KinesisReceiverSuite.scala          |  12 +-
 .../spark/mllib/util/LinearDataGenerator.scala  |   4 +-
 .../ml/classification/JavaOneVsRestSuite.java   |   7 +-
 .../LogisticRegressionSuite.scala               |   4 +-
 .../spark/mllib/classification/SVMSuite.scala   |   4 +-
 .../optimization/GradientDescentSuite.scala     |   4 +-
 .../spark/mllib/recommendation/ALSSuite.scala   |   4 +-
 project/SparkBuild.scala                        |   8 +-
 python/pyspark/sql/column.py                    |  12 ++
 python/pyspark/sql/dataframe.py                 |   4 +-
 scalastyle-config.xml                           |   7 ++
 .../main/scala/org/apache/spark/sql/Row.scala   |  12 +-
 .../spark/sql/catalyst/analysis/Catalog.scala   |   4 +-
 .../apache/spark/sql/DataFrameNaFunctions.scala |   8 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |   4 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   4 +-
 .../org/apache/spark/sql/GroupedData.scala      |   4 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |  13 ++-
 .../scala/org/apache/spark/sql/SQLContext.scala |   8 +-
 .../datasources/ResolvedDataSource.scala        |   4 +-
 .../parquet/CatalystReadSupport.scala           |   8 +-
 .../parquet/CatalystRowConverter.scala          |   4 +-
 .../parquet/CatalystSchemaConverter.scala       |   4 +-
 .../datasources/parquet/ParquetRelation.scala   |  13 ++-
 .../parquet/ParquetTypesConverter.scala         |   4 +-
 .../execution/joins/ShuffledHashOuterJoin.scala |   6 +-
 .../apache/spark/sql/execution/pythonUDFs.scala |  11 +-
 .../apache/spark/sql/JavaDataFrameSuite.java    |   8 +-
 .../spark/sql/DataFrameNaFunctionsSuite.scala   |   6 +-
 .../scala/org/apache/spark/sql/QueryTest.scala  |   4 +-
 .../parquet/ParquetAvroCompatibilitySuite.scala |   3 +-
 .../parquet/ParquetCompatibilityTest.scala      |   7 +-
 .../datasources/parquet/ParquetIOSuite.scala    |  25 +++--
 .../SparkExecuteStatementOperation.scala        |  10 +-
 .../hive/thriftserver/SparkSQLCLIDriver.scala   |  16 +--
 .../hive/thriftserver/SparkSQLCLIService.scala  |   6 +-
 .../sql/hive/thriftserver/SparkSQLDriver.scala  |  14 +--
 .../sql/hive/thriftserver/SparkSQLEnv.scala     |   4 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   4 +-
 .../apache/spark/sql/hive/HiveInspectors.scala  |  40 +++----
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  12 +-
 .../org/apache/spark/sql/hive/HiveQl.scala      | 110 +++++++++++--------
 .../org/apache/spark/sql/hive/HiveShim.scala    |   5 +-
 .../spark/sql/hive/client/ClientWrapper.scala   |  27 ++---
 .../apache/spark/sql/hive/client/HiveShim.scala |  14 +--
 .../execution/DescribeHiveTableCommand.scala    |   8 +-
 .../sql/hive/execution/HiveTableScan.scala      |   9 +-
 .../hive/execution/InsertIntoHiveTable.scala    |  12 +-
 .../hive/execution/ScriptTransformation.scala   |  12 +-
 .../org/apache/spark/sql/hive/hiveUDFs.scala    |   9 +-
 .../apache/spark/sql/hive/orc/OrcRelation.scala |   8 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  11 +-
 .../spark/sql/hive/client/FiltersSuite.scala    |   4 +-
 .../spark/sql/hive/execution/HiveUDFSuite.scala |  29 +++--
 .../spark/sql/hive/execution/PruningSuite.scala |   7 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   4 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |   8 +-
 .../streaming/api/java/JavaDStreamLike.scala    |  12 +-
 .../streaming/api/java/JavaPairDStream.scala    |  28 ++---
 .../api/java/JavaStreamingContext.scala         |  32 +++---
 .../streaming/api/python/PythonDStream.scala    |   5 +-
 .../spark/streaming/receiver/Receiver.scala     |   6 +-
 .../streaming/scheduler/JobScheduler.scala      |   4 +-
 .../scheduler/ReceivedBlockTracker.scala        |   4 +-
 .../streaming/util/FileBasedWriteAheadLog.scala |   4 +-
 .../apache/spark/streaming/JavaTestUtils.scala  |  24 ++--
 .../streaming/util/WriteAheadLogSuite.scala     |   4 +-
 .../apache/spark/tools/GenerateMIMAIgnore.scala |   6 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  13 ++-
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  24 ++--
 .../spark/deploy/yarn/YarnAllocator.scala       |  19 ++--
 .../apache/spark/deploy/yarn/YarnRMClient.scala |   8 +-
 .../deploy/yarn/BaseYarnClusterSuite.scala      |   6 +-
 .../apache/spark/deploy/yarn/ClientSuite.scala  |   8 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala    |   5 +-
 171 files changed, 863 insertions(+), 880 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
index 2389c28..fdb309e 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 
 import scala.Option;
 import scala.Product2;
-import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 import scala.collection.immutable.Map;
 import scala.reflect.ClassTag;
 import scala.reflect.ClassTag$;
@@ -160,7 +160,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
    */
   @VisibleForTesting
   public void write(Iterator<Product2<K, V>> records) throws IOException {
-    write(JavaConversions.asScalaIterator(records));
+    write(JavaConverters.asScalaIteratorConverter(records).asScala());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 9221883..a387592 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -21,8 +21,8 @@ import java.io._
 import java.util.concurrent.ConcurrentHashMap
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
-import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, 
RpcEndpoint}
@@ -398,7 +398,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
  */
 private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends 
MapOutputTracker(conf) {
   protected val mapStatuses: Map[Int, Array[MapStatus]] =
-    new ConcurrentHashMap[Int, Array[MapStatus]]
+    new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
 }
 
 private[spark] object MapOutputTracker extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/SSLOptions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala 
b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 32df42d..3b9c885 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark
 
-import java.io.{File, FileInputStream}
-import java.security.{KeyStore, NoSuchAlgorithmException}
-import javax.net.ssl.{KeyManager, KeyManagerFactory, SSLContext, TrustManager, 
TrustManagerFactory}
+import java.io.File
+import java.security.NoSuchAlgorithmException
+import javax.net.ssl.SSLContext
+
+import scala.collection.JavaConverters._
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
 import org.eclipse.jetty.util.ssl.SslContextFactory
@@ -79,7 +81,6 @@ private[spark] case class SSLOptions(
    * object. It can be used then to compose the ultimate Akka configuration.
    */
   def createAkkaConfig: Option[Config] = {
-    import scala.collection.JavaConversions._
     if (enabled) {
       Some(ConfigFactory.empty()
         .withValue("akka.remote.netty.tcp.security.key-store",
@@ -97,7 +98,7 @@ private[spark] case class SSLOptions(
         .withValue("akka.remote.netty.tcp.security.protocol",
           ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
         .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
-          ConfigValueFactory.fromIterable(supportedAlgorithms.toSeq))
+          ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
         .withValue("akka.remote.netty.tcp.enable-ssl",
           ConfigValueFactory.fromAnyRef(true)))
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9849aff..f3da04a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -26,8 +26,8 @@ import java.util.{Arrays, Properties, UUID}
 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, 
AtomicInteger}
 import java.util.UUID.randomUUID
 
+import scala.collection.JavaConverters._
 import scala.collection.{Map, Set}
-import scala.collection.JavaConversions._
 import scala.collection.generic.Growable
 import scala.collection.mutable.HashMap
 import scala.reflect.{ClassTag, classTag}
@@ -1546,7 +1546,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   def getAllPools: Seq[Schedulable] = {
     assertNotStopped()
     // TODO(xiajunluan): We should take nested pools into account
-    taskScheduler.rootPool.schedulableQueue.toSeq
+    taskScheduler.rootPool.schedulableQueue.asScala.toSeq
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index a1ebbec..888763a 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -19,11 +19,12 @@ package org.apache.spark
 
 import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
 import java.net.{URI, URL}
+import java.nio.charset.StandardCharsets
+import java.util.Arrays
 import java.util.jar.{JarEntry, JarOutputStream}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
-import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.{ByteStreams, Files}
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
 
@@ -71,7 +72,7 @@ private[spark] object TestUtils {
     files.foreach { case (k, v) =>
       val entry = new JarEntry(k)
       jarStream.putNextEntry(entry)
-      ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream)
+      ByteStreams.copy(new 
ByteArrayInputStream(v.getBytes(StandardCharsets.UTF_8)), jarStream)
     }
     jarStream.close()
     jarFile.toURI.toURL
@@ -125,7 +126,7 @@ private[spark] object TestUtils {
     } else {
       Seq()
     }
-    compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call()
+    compiler.getTask(null, null, null, options.asJava, null, 
Arrays.asList(sourceFile)).call()
 
     val fileName = className + ".class"
     val result = new File(fileName)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
index 0ae0b4e..891bcdd 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.api.java
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.mapred.InputSplit
@@ -37,7 +37,7 @@ class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
   def mapPartitionsWithInputSplit[R](
       f: JFunction2[InputSplit, java.util.Iterator[(K, V)], 
java.util.Iterator[R]],
       preservesPartitioning: Boolean = false): JavaRDD[R] = {
-    new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, 
asJavaIterator(b)),
+    new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, 
b.asJava).asScala,
       preservesPartitioning)(fakeClassTag))(fakeClassTag)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
index ec4f396..0f49279 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.api.java
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.mapreduce.InputSplit
@@ -37,7 +37,7 @@ class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
   def mapPartitionsWithInputSplit[R](
       f: JFunction2[InputSplit, java.util.Iterator[(K, V)], 
java.util.Iterator[R]],
       preservesPartitioning: Boolean = false): JavaRDD[R] = {
-    new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, 
asJavaIterator(b)),
+    new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, 
b.asJava).asScala,
       preservesPartitioning)(fakeClassTag))(fakeClassTag)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 8441bb3..fb78797 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.api.java
 import java.util.{Comparator, List => JList, Map => JMap}
 import java.lang.{Iterable => JIterable}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
@@ -142,7 +142,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
   def sampleByKey(withReplacement: Boolean,
       fractions: JMap[K, Double],
       seed: Long): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed))
+    new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, 
seed))
 
   /**
    * Return a subset of this RDD sampled by key (via stratified sampling).
@@ -173,7 +173,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
   def sampleByKeyExact(withReplacement: Boolean,
       fractions: JMap[K, Double],
       seed: Long): JavaPairRDD[K, V] =
-    new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions, 
seed))
+    new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, 
fractions.asScala, seed))
 
   /**
    * ::Experimental::
@@ -768,7 +768,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Return the list of values in the RDD for key `key`. This operation is 
done efficiently if the
    * RDD has a known partitioner by only searching the partition that the key 
maps to.
    */
-  def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key))
+  def lookup(key: K): JList[V] = rdd.lookup(key).asJava
 
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsHadoopFile[F <: OutputFormat[_, _]](
@@ -987,30 +987,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
 object JavaPairRDD {
   private[spark]
   def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, 
JIterable[T])] = {
-    rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
+    rddToPairRDDFunctions(rdd).mapValues(_.asJava)
   }
 
   private[spark]
   def cogroupResultToJava[K: ClassTag, V, W](
       rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], 
JIterable[W]))] = {
-    rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), 
asJavaIterable(x._2)))
+    rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava))
   }
 
   private[spark]
   def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
       rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
       : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
-    rddToPairRDDFunctions(rdd)
-      .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), 
asJavaIterable(x._3)))
+    rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, 
x._3.asJava))
   }
 
   private[spark]
   def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3](
       rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))])
   : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = {
-    rddToPairRDDFunctions(rdd)
-      .mapValues(x =>
-        (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), 
asJavaIterable(x._4)))
+    rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, 
x._3.asJava, x._4.asJava))
   }
 
   def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = 
{

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index c582488..fc817cd 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -21,7 +21,6 @@ import java.{lang => jl}
 import java.lang.{Iterable => JIterable, Long => JLong}
 import java.util.{Comparator, List => JList, Iterator => JIterator}
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
@@ -59,10 +58,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   def rdd: RDD[T]
 
   @deprecated("Use partitions() instead.", "1.1.0")
-  def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)
+  def splits: JList[Partition] = rdd.partitions.toSeq.asJava
 
   /** Set of partitions in this RDD. */
-  def partitions: JList[Partition] = new 
java.util.ArrayList(rdd.partitions.toSeq)
+  def partitions: JList[Partition] = rdd.partitions.toSeq.asJava
 
   /** The partitioner of this RDD. */
   def partitioner: Optional[Partitioner] = 
JavaUtils.optionToOptional(rdd.partitioner)
@@ -82,7 +81,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * subclasses of RDD.
    */
   def iterator(split: Partition, taskContext: TaskContext): 
java.util.Iterator[T] =
-    asJavaIterator(rdd.iterator(split, taskContext))
+    rdd.iterator(split, taskContext).asJava
 
   // Transformations (return a new RDD)
 
@@ -99,7 +98,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   def mapPartitionsWithIndex[R](
       f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
       preservesPartitioning: Boolean = false): JavaRDD[R] =
-    new JavaRDD(rdd.mapPartitionsWithIndex(((a, b) => f(a, asJavaIterator(b))),
+    new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, 
b.asJava).asScala,
         preservesPartitioning)(fakeClassTag))(fakeClassTag)
 
   /**
@@ -153,7 +152,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    */
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): 
JavaRDD[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
-      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
     }
     JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
   }
@@ -164,7 +163,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
       preservesPartitioning: Boolean): JavaRDD[U] = {
     def fn: (Iterator[T]) => Iterator[U] = {
-      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
     }
     JavaRDD.fromRDD(
       rdd.mapPartitions(fn, 
preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
@@ -175,7 +174,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    */
   def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): 
JavaDoubleRDD = {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
-      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
     }
     new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => 
x.doubleValue()))
   }
@@ -186,7 +185,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   def mapPartitionsToPair[K2, V2](f: 
PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
   JavaPairRDD[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
-      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
     }
     JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], 
fakeClassTag[V2])
   }
@@ -197,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
       preservesPartitioning: Boolean): JavaDoubleRDD = {
     def fn: (Iterator[T]) => Iterator[jl.Double] = {
-      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
     }
     new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
       .map(x => x.doubleValue()))
@@ -209,7 +208,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   def mapPartitionsToPair[K2, V2](f: 
PairFlatMapFunction[java.util.Iterator[T], K2, V2],
       preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
     def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
-      (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+      (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
     }
     JavaPairRDD.fromRDD(
       rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], 
fakeClassTag[V2])
@@ -219,14 +218,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
-    rdd.foreachPartition((x => f.call(asJavaIterator(x))))
+    rdd.foreachPartition((x => f.call(x.asJava)))
   }
 
   /**
    * Return an RDD created by coalescing all elements within each partition 
into an array.
    */
   def glom(): JavaRDD[JList[T]] =
-    new JavaRDD(rdd.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+    new JavaRDD(rdd.glom().map(_.toSeq.asJava))
 
   /**
    * Return the Cartesian product of this RDD and another one, that is, the 
RDD of all pairs of
@@ -266,13 +265,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
    * Return an RDD created by piping elements to a forked external process.
    */
   def pipe(command: JList[String]): JavaRDD[String] =
-    rdd.pipe(asScalaBuffer(command))
+    rdd.pipe(command.asScala)
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
   def pipe(command: JList[String], env: java.util.Map[String, String]): 
JavaRDD[String] =
-    rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
+    rdd.pipe(command.asScala, env.asScala)
 
   /**
    * Zips this RDD with another one, returning key-value pairs with the first 
element in each RDD,
@@ -294,8 +293,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
       other: JavaRDDLike[U, _],
       f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): 
JavaRDD[V] = {
     def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
-      (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
-        f.call(asJavaIterator(x), asJavaIterator(y)).iterator())
+      (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, 
y.asJava).iterator().asScala
     }
     JavaRDD.fromRDD(
       rdd.zipPartitions(other.rdd)(fn)(other.classTag, 
fakeClassTag[V]))(fakeClassTag[V])
@@ -333,22 +331,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
   /**
    * Return an array that contains all of the elements in this RDD.
    */
-  def collect(): JList[T] = {
-    import scala.collection.JavaConversions._
-    val arr: java.util.Collection[T] = rdd.collect().toSeq
-    new java.util.ArrayList(arr)
-  }
+  def collect(): JList[T] =
+    rdd.collect().toSeq.asJava
 
   /**
    * Return an iterator that contains all of the elements in this RDD.
    *
    * The iterator will consume as much memory as the largest partition in this 
RDD.
    */
-  def toLocalIterator(): JIterator[T] = {
-     import scala.collection.JavaConversions._
-     rdd.toLocalIterator
-  }
-
+  def toLocalIterator(): JIterator[T] =
+     asJavaIteratorConverter(rdd.toLocalIterator).asJava
 
   /**
    * Return an array that contains all of the elements in this RDD.
@@ -363,9 +355,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
     // This is useful for implementing `take` from other language frontends
     // like Python where the data is serialized.
-    import scala.collection.JavaConversions._
     val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, 
partitionIds)
-    res.map(x => new java.util.ArrayList(x.toSeq)).toArray
+    res.map(_.toSeq.asJava)
   }
 
   /**
@@ -489,20 +480,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
    * it will be slow if a lot of partitions are required. In that case, use 
collect() to get the
    * whole RDD instead.
    */
-  def take(num: Int): JList[T] = {
-    import scala.collection.JavaConversions._
-    val arr: java.util.Collection[T] = rdd.take(num).toSeq
-    new java.util.ArrayList(arr)
-  }
+  def take(num: Int): JList[T] =
+    rdd.take(num).toSeq.asJava
 
   def takeSample(withReplacement: Boolean, num: Int): JList[T] =
     takeSample(withReplacement, num, Utils.random.nextLong)
 
-  def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = {
-    import scala.collection.JavaConversions._
-    val arr: java.util.Collection[T] = rdd.takeSample(withReplacement, num, 
seed).toSeq
-    new java.util.ArrayList(arr)
-  }
+  def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] =
+    rdd.takeSample(withReplacement, num, seed).toSeq.asJava
 
   /**
    * Return the first element in this RDD.
@@ -582,10 +567,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * @return an array of top elements
    */
   def top(num: Int, comp: Comparator[T]): JList[T] = {
-    import scala.collection.JavaConversions._
-    val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp))
-    val arr: java.util.Collection[T] = topElems.toSeq
-    new java.util.ArrayList(arr)
+    rdd.top(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava
   }
 
   /**
@@ -607,10 +589,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * @return an array of top elements
    */
   def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = {
-    import scala.collection.JavaConversions._
-    val topElems = rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp))
-    val arr: java.util.Collection[T] = topElems.toSeq
-    new java.util.ArrayList(arr)
+    rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava
   }
 
   /**
@@ -696,7 +675,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
    * applies a function f to each partition of this RDD.
    */
   def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): 
JavaFutureAction[Void] = {
-    new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => 
f.call(x)),
+    new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => 
f.call(x.asJava)),
       { x => null.asInstanceOf[Void] })
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 02e49a8..609496c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -21,8 +21,7 @@ import java.io.Closeable
 import java.util
 import java.util.{Map => JMap}
 
-import scala.collection.JavaConversions
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
@@ -104,7 +103,7 @@ class JavaSparkContext(val sc: SparkContext)
    */
   def this(master: String, appName: String, sparkHome: String, jars: 
Array[String],
       environment: JMap[String, String]) =
-    this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, 
Map()))
+    this(new SparkContext(master, appName, sparkHome, jars.toSeq, 
environment.asScala, Map()))
 
   private[spark] val env = sc.env
 
@@ -118,7 +117,7 @@ class JavaSparkContext(val sc: SparkContext)
 
   def appName: String = sc.appName
 
-  def jars: util.List[String] = sc.jars
+  def jars: util.List[String] = sc.jars.asJava
 
   def startTime: java.lang.Long = sc.startTime
 
@@ -142,7 +141,7 @@ class JavaSparkContext(val sc: SparkContext)
   /** Distribute a local Scala collection to form an RDD. */
   def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
     implicit val ctag: ClassTag[T] = fakeClassTag
-    sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
+    sc.parallelize(list.asScala, numSlices)
   }
 
   /** Get an RDD that has no partitions or elements. */
@@ -161,7 +160,7 @@ class JavaSparkContext(val sc: SparkContext)
   : JavaPairRDD[K, V] = {
     implicit val ctagK: ClassTag[K] = fakeClassTag
     implicit val ctagV: ClassTag[V] = fakeClassTag
-    JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), 
numSlices))
+    JavaPairRDD.fromRDD(sc.parallelize(list.asScala, numSlices))
   }
 
   /** Distribute a local Scala collection to form an RDD. */
@@ -170,8 +169,7 @@ class JavaSparkContext(val sc: SparkContext)
 
   /** Distribute a local Scala collection to form an RDD. */
   def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: 
Int): JavaDoubleRDD =
-    
JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()),
-      numSlices))
+    JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()), 
numSlices))
 
   /** Distribute a local Scala collection to form an RDD. */
   def parallelizeDoubles(list: java.util.List[java.lang.Double]): 
JavaDoubleRDD =
@@ -519,7 +517,7 @@ class JavaSparkContext(val sc: SparkContext)
 
   /** Build the union of two or more RDDs. */
   override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): 
JavaRDD[T] = {
-    val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
+    val rdds: Seq[RDD[T]] = (Seq(first) ++ rest.asScala).map(_.rdd)
     implicit val ctag: ClassTag[T] = first.classTag
     sc.union(rdds)
   }
@@ -527,7 +525,7 @@ class JavaSparkContext(val sc: SparkContext)
   /** Build the union of two or more RDDs. */
   override def union[K, V](first: JavaPairRDD[K, V], rest: 
java.util.List[JavaPairRDD[K, V]])
       : JavaPairRDD[K, V] = {
-    val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
+    val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ rest.asScala).map(_.rdd)
     implicit val ctag: ClassTag[(K, V)] = first.classTag
     implicit val ctagK: ClassTag[K] = first.kClassTag
     implicit val ctagV: ClassTag[V] = first.vClassTag
@@ -536,7 +534,7 @@ class JavaSparkContext(val sc: SparkContext)
 
   /** Build the union of two or more RDDs. */
   override def union(first: JavaDoubleRDD, rest: 
java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
-    val rdds: Seq[RDD[Double]] = (Seq(first) ++ 
asScalaBuffer(rest)).map(_.srdd)
+    val rdds: Seq[RDD[Double]] = (Seq(first) ++ rest.asScala).map(_.srdd)
     new JavaDoubleRDD(sc.union(rdds))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index b959b68..a7dfa1d 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -17,15 +17,17 @@
 
 package org.apache.spark.api.python
 
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.{Logging, SparkException}
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io._
-import scala.util.{Failure, Success, Try}
-import org.apache.spark.annotation.Experimental
 
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
  * :: Experimental ::
@@ -68,7 +70,6 @@ private[python] class WritableToJavaConverter(
    * object representation
    */
   private def convertWritable(writable: Writable): Any = {
-    import collection.JavaConversions._
     writable match {
       case iw: IntWritable => iw.get()
       case dw: DoubleWritable => dw.get()
@@ -89,9 +90,7 @@ private[python] class WritableToJavaConverter(
         aw.get().map(convertWritable(_))
       case mw: MapWritable =>
         val map = new java.util.HashMap[Any, Any]()
-        mw.foreach { case (k, v) =>
-          map.put(convertWritable(k), convertWritable(v))
-        }
+        mw.asScala.foreach { case (k, v) => map.put(convertWritable(k), 
convertWritable(v)) }
         map
       case w: Writable => WritableUtils.clone(w, conf.value.value)
       case other => other
@@ -122,7 +121,6 @@ private[python] class JavaToWritableConverter extends 
Converter[Any, Writable] {
    * supported out-of-the-box.
    */
   private def convertToWritable(obj: Any): Writable = {
-    import collection.JavaConversions._
     obj match {
       case i: java.lang.Integer => new IntWritable(i)
       case d: java.lang.Double => new DoubleWritable(d)
@@ -134,7 +132,7 @@ private[python] class JavaToWritableConverter extends 
Converter[Any, Writable] {
       case null => NullWritable.get()
       case map: java.util.Map[_, _] =>
         val mapWritable = new MapWritable()
-        map.foreach { case (k, v) =>
+        map.asScala.foreach { case (k, v) =>
           mapWritable.put(convertToWritable(k), convertToWritable(v))
         }
         mapWritable
@@ -161,9 +159,8 @@ private[python] object PythonHadoopUtil {
    * Convert a [[java.util.Map]] of properties to a 
[[org.apache.hadoop.conf.Configuration]]
    */
   def mapToConf(map: java.util.Map[String, String]): Configuration = {
-    import collection.JavaConversions._
     val conf = new Configuration()
-    map.foreach{ case (k, v) => conf.set(k, v) }
+    map.asScala.foreach { case (k, v) => conf.set(k, v) }
     conf
   }
 
@@ -172,9 +169,8 @@ private[python] object PythonHadoopUtil {
    * any matching keys in left
    */
   def mergeConfs(left: Configuration, right: Configuration): Configuration = {
-    import collection.JavaConversions._
     val copy = new Configuration(left)
-    right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
+    right.asScala.foreach(entry => copy.set(entry.getKey, entry.getValue))
     copy
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2a56bf2..b4d152b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net._
 import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => 
JMap}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.existentials
 
@@ -66,11 +66,11 @@ private[spark] class PythonRDD(
     val env = SparkEnv.get
     val localdir = env.blockManager.diskBlockManager.localDirs.map(
       f => f.getPath()).mkString(",")
-    envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor 
thread
+    envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor 
thread
     if (reuse_worker) {
-      envVars += ("SPARK_REUSE_WORKER" -> "1")
+      envVars.put("SPARK_REUSE_WORKER", "1")
     }
-    val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
+    val worker: Socket = env.createPythonWorker(pythonExec, 
envVars.asScala.toMap)
     // Whether is the worker released into idle pool
     @volatile var released = false
 
@@ -150,7 +150,7 @@ private[spark] class PythonRDD(
               // Check whether the worker is ready to be re-used.
               if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
                 if (reuse_worker) {
-                  env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+                  env.releasePythonWorker(pythonExec, envVars.asScala.toMap, 
worker)
                   released = true
                 }
               }
@@ -217,13 +217,13 @@ private[spark] class PythonRDD(
         // sparkFilesDir
         PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut)
         // Python includes (*.zip and *.egg files)
-        dataOut.writeInt(pythonIncludes.length)
-        for (include <- pythonIncludes) {
+        dataOut.writeInt(pythonIncludes.size())
+        for (include <- pythonIncludes.asScala) {
           PythonRDD.writeUTF(include, dataOut)
         }
         // Broadcast variables
         val oldBids = PythonRDD.getWorkerBroadcasts(worker)
-        val newBids = broadcastVars.map(_.id).toSet
+        val newBids = broadcastVars.asScala.map(_.id).toSet
         // number of different broadcasts
         val toRemove = oldBids.diff(newBids)
         val cnt = toRemove.size + newBids.diff(oldBids).size
@@ -233,7 +233,7 @@ private[spark] class PythonRDD(
           dataOut.writeLong(- bid - 1)  // bid >= 0
           oldBids.remove(bid)
         }
-        for (broadcast <- broadcastVars) {
+        for (broadcast <- broadcastVars.asScala) {
           if (!oldBids.contains(broadcast.id)) {
             // send new broadcast
             dataOut.writeLong(broadcast.id)
@@ -287,7 +287,7 @@ private[spark] class PythonRDD(
       if (!context.isCompleted) {
         try {
           logWarning("Incomplete task interrupted: Attempting to kill Python 
Worker")
-          env.destroyPythonWorker(pythonExec, envVars.toMap, worker)
+          env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
         } catch {
           case e: Exception =>
             logError("Exception when trying to kill worker", e)
@@ -358,10 +358,10 @@ private[spark] object PythonRDD extends Logging {
     type ByteArray = Array[Byte]
     type UnrolledPartition = Array[ByteArray]
     val allPartitions: Array[UnrolledPartition] =
-      sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions)
+      sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala)
     val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
     serveIterator(flattenedPartition.iterator,
-      s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")
+      s"serve RDD ${rdd.id} with partitions 
${partitions.asScala.mkString(",")}")
   }
 
   /**
@@ -819,7 +819,7 @@ private class PythonAccumulatorParam(@transient serverHost: 
String, serverPort:
       val in = socket.getInputStream
       val out = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream, bufferSize))
       out.writeInt(val2.size)
-      for (array <- val2) {
+      for (array <- val2.asScala) {
         out.writeInt(array.length)
         out.write(array)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 90dacae..31e534f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.api.python
 
-import java.io.{File}
+import java.io.File
 import java.util.{List => JList}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkContext
@@ -51,7 +51,14 @@ private[spark] object PythonUtils {
    * Convert list of T into seq of T (for calling API with varargs)
    */
   def toSeq[T](vs: JList[T]): Seq[T] = {
-    vs.toList.toSeq
+    vs.asScala
+  }
+
+  /**
+   * Convert list of T into a (Scala) List of T
+   */
+  def toList[T](vs: JList[T]): List[T] = {
+    vs.asScala.toList
   }
 
   /**
@@ -65,6 +72,6 @@ private[spark] object PythonUtils {
    * Convert java map of K, V into Map of K, V (for calling API with varargs)
    */
   def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = {
-    jm.toMap
+    jm.asScala.toMap
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index e314408..7039b73 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -19,9 +19,10 @@ package org.apache.spark.api.python
 
 import java.io.{DataOutputStream, DataInputStream, InputStream, 
OutputStreamWriter}
 import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.util.Arrays
 
 import scala.collection.mutable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.spark._
 import org.apache.spark.util.{RedirectThread, Utils}
@@ -108,9 +109,9 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
       serverSocket = new ServerSocket(0, 1, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
 
       // Create and start the worker
-      val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
+      val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
"pyspark.worker"))
       val workerEnv = pb.environment()
-      workerEnv.putAll(envVars)
+      workerEnv.putAll(envVars.asJava)
       workerEnv.put("PYTHONPATH", pythonPath)
       // This is equivalent to setting the -u flag; we use it because ipython 
doesn't support -u:
       workerEnv.put("PYTHONUNBUFFERED", "YES")
@@ -151,9 +152,9 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 
       try {
         // Create and start the daemon
-        val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
+        val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
"pyspark.daemon"))
         val workerEnv = pb.environment()
-        workerEnv.putAll(envVars)
+        workerEnv.putAll(envVars.asJava)
         workerEnv.put("PYTHONPATH", pythonPath)
         // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
         workerEnv.put("PYTHONUNBUFFERED", "YES")

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 1f1debc..fd27276 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -22,7 +22,6 @@ import java.util.{ArrayList => JArrayList}
 
 import org.apache.spark.api.java.JavaRDD
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Failure
@@ -214,7 +213,7 @@ private[spark] object SerDeUtil extends Logging {
         new AutoBatchedPickler(cleaned)
       } else {
         val pickle = new Pickler
-        cleaned.grouped(batchSize).map(batched => 
pickle.dumps(seqAsJavaList(batched)))
+        cleaned.grouped(batchSize).map(batched => pickle.dumps(batched.asJava))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
 
b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index 8f30ff9..ee1fb05 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -20,6 +20,8 @@ package org.apache.spark.api.python
 import java.io.{DataOutput, DataInput}
 import java.{util => ju}
 
+import scala.collection.JavaConverters._
+
 import com.google.common.base.Charsets.UTF_8
 
 import org.apache.hadoop.io._
@@ -62,10 +64,9 @@ private[python] class TestInputKeyConverter extends 
Converter[Any, Any] {
 }
 
 private[python] class TestInputValueConverter extends Converter[Any, Any] {
-  import collection.JavaConversions._
   override def convert(obj: Any): ju.List[Double] = {
     val m = obj.asInstanceOf[MapWritable]
-    seqAsJavaList(m.keySet.map(w => 
w.asInstanceOf[DoubleWritable].get()).toSeq)
+    m.keySet.asScala.map(_.asInstanceOf[DoubleWritable].get()).toSeq.asJava
   }
 }
 
@@ -76,9 +77,8 @@ private[python] class TestOutputKeyConverter extends 
Converter[Any, Any] {
 }
 
 private[python] class TestOutputValueConverter extends Converter[Any, Any] {
-  import collection.JavaConversions._
   override def convert(obj: Any): DoubleWritable = {
-    new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, 
_]].keySet().head)
+    new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, 
_]].keySet().iterator().next())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala 
b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 1cf2824f..9d5bbb5 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -19,9 +19,10 @@ package org.apache.spark.api.r
 
 import java.io._
 import java.net.{InetAddress, ServerSocket}
+import java.util.Arrays
 import java.util.{Map => JMap}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.io.Source
 import scala.reflect.ClassTag
 import scala.util.Try
@@ -365,11 +366,11 @@ private[r] object RRDD {
       sparkConf.setIfMissing("spark.master", "local")
     }
 
-    for ((name, value) <- sparkEnvirMap) {
-      sparkConf.set(name.asInstanceOf[String], value.asInstanceOf[String])
+    for ((name, value) <- sparkEnvirMap.asScala) {
+      sparkConf.set(name.toString, value.toString)
     }
-    for ((name, value) <- sparkExecutorEnvMap) {
-      sparkConf.setExecutorEnv(name.asInstanceOf[String], 
value.asInstanceOf[String])
+    for ((name, value) <- sparkExecutorEnvMap.asScala) {
+      sparkConf.setExecutorEnv(name.toString, value.toString)
     }
 
     val jsc = new JavaSparkContext(sparkConf)
@@ -395,7 +396,7 @@ private[r] object RRDD {
     val rOptions = "--vanilla"
     val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
     val rExecScript = rLibDir + "/SparkR/worker/" + script
-    val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
+    val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript))
     // Unset the R_TESTS environment variable for workers.
     // This is set by R CMD check as startup.Rs
     // (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala 
b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 427b2bc..9e807cc 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.api.r
 
 import java.io.File
-
-import scala.collection.JavaConversions._
+import java.util.Arrays
 
 import org.apache.spark.{SparkEnv, SparkException}
 
@@ -68,7 +67,7 @@ private[spark] object RUtils {
   /** Check if R is installed before running tests that use R commands. */
   def isRInstalled: Boolean = {
     try {
-      val builder = new ProcessBuilder(Seq("R", "--version"))
+      val builder = new ProcessBuilder(Arrays.asList("R", "--version"))
       builder.start().waitFor() == 0
     } catch {
       case e: Exception => false

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala 
b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index 3c89f24..dbbbcf4 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -20,7 +20,7 @@ package org.apache.spark.api.r
 import java.io.{DataInputStream, DataOutputStream}
 import java.sql.{Timestamp, Date, Time}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * Utility functions to serialize, deserialize objects to / from R
@@ -165,7 +165,7 @@ private[spark] object SerDe {
         val valueType = readObjectType(in)
         readTypedObject(in, valueType)
       })
-      mapAsJavaMap(keys.zip(values).toMap)
+      keys.zip(values).toMap.asJava
     } else {
       new java.util.HashMap[Object, Object]()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index a0c9b5e..7e3764d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -20,7 +20,7 @@ package org.apache.spark.broadcast
 import java.io._
 import java.nio.ByteBuffer
 
-import scala.collection.JavaConversions.asJavaEnumeration
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.Random
 
@@ -210,7 +210,7 @@ private object TorrentBroadcast extends Logging {
       compressionCodec: Option[CompressionCodec]): T = {
     require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
     val is = new SequenceInputStream(
-      asJavaEnumeration(blocks.iterator.map(block => new 
ByteBufferInputStream(block))))
+      blocks.iterator.map(new ByteBufferInputStream(_)).asJavaEnumeration)
     val in: InputStream = compressionCodec.map(c => 
c.compressedInputStream(is)).getOrElse(is)
     val ser = serializer.newInstance()
     val serIn = ser.deserializeStream(in)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 22ef701..6840a3a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -19,13 +19,13 @@ package org.apache.spark.deploy
 
 import java.util.concurrent.CountDownLatch
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.spark.{Logging, SparkConf, SecurityManager}
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.sasl.SaslServerBootstrap
-import org.apache.spark.network.server.TransportServer
+import org.apache.spark.network.server.{TransportServerBootstrap, 
TransportServer}
 import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.util.Utils
@@ -67,13 +67,13 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
   def start() {
     require(server == null, "Shuffle server already started")
     logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
-    val bootstraps =
+    val bootstraps: Seq[TransportServerBootstrap] =
       if (useSasl) {
         Seq(new SaslServerBootstrap(transportConf, securityManager))
       } else {
         Nil
       }
-    server = transportContext.createServer(port, bootstraps)
+    server = transportContext.createServer(port, bootstraps.asJava)
   }
 
   /** Clean up all shuffle files associated with an application that has 
exited. */

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 23d01e9..d853276 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -21,7 +21,7 @@ import java.net.URI
 import java.io.File
 
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.util.Try
 
 import org.apache.spark.SparkUserAppException
@@ -71,7 +71,7 @@ object PythonRunner {
     val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
 
     // Launch Python process
-    val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ 
otherArgs)
+    val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ 
otherArgs).asJava)
     val env = builder.environment()
     env.put("PYTHONPATH", pythonPath)
     // This is equivalent to setting the -u flag; we use it because ipython 
doesn't support -u:

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index ed1e972..4b28866 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -22,7 +22,7 @@ import java.util.jar.JarFile
 import java.util.logging.Level
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import com.google.common.io.{ByteStreams, Files}
 
@@ -110,7 +110,7 @@ private[deploy] object RPackageUtils extends Logging {
       print(s"Building R package with the command: $installCmd", printStream)
     }
     try {
-      val builder = new ProcessBuilder(installCmd)
+      val builder = new ProcessBuilder(installCmd.asJava)
       builder.redirectErrorStream(true)
       val env = builder.environment()
       env.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index c0cab22..05b954c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
 import java.io._
 import java.util.concurrent.{Semaphore, TimeUnit}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.Path
 
@@ -68,7 +68,7 @@ object RRunner {
     if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
       // Launch R
       val returnCode = try {
-        val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ 
otherArgs)
+        val builder = new ProcessBuilder((Seq(rCommand, rFileNormalized) ++ 
otherArgs).asJava)
         val env = builder.environment()
         env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
         val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
index b8d3993..8d5e716 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
@@ -57,7 +57,7 @@ private[spark] object SparkCuratorUtil extends Logging {
 
   def deleteRecursive(zk: CuratorFramework, path: String) {
     if (zk.checkExists().forPath(path) != null) {
-      for (child <- zk.getChildren.forPath(path)) {
+      for (child <- zk.getChildren.forPath(path).asScala) {
         zk.delete().forPath(path + "/" + child)
       }
       zk.delete().forPath(path)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index dda4216..f7723ef 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -22,7 +22,7 @@ import java.lang.reflect.Method
 import java.security.PrivilegedExceptionAction
 import java.util.{Arrays, Comparator}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.control.NonFatal
@@ -71,7 +71,7 @@ class SparkHadoopUtil extends Logging {
   }
 
   def transferCredentials(source: UserGroupInformation, dest: 
UserGroupInformation) {
-    for (token <- source.getTokens()) {
+    for (token <- source.getTokens.asScala) {
       dest.addToken(token)
     }
   }
@@ -175,8 +175,8 @@ class SparkHadoopUtil extends Logging {
   }
 
   private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
-    val stats = FileSystem.getAllStatistics()
-    stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
+    FileSystem.getAllStatistics.asScala.map(
+      Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
   }
 
   private def getFileSystemThreadStatisticsMethod(methodName: String): Method 
= {
@@ -306,12 +306,13 @@ class SparkHadoopUtil extends Logging {
     val renewalInterval =
       sparkConf.getLong("spark.yarn.token.renewal.interval", (24 
hours).toMillis)
 
-    credentials.getAllTokens.filter(_.getKind == 
DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+    credentials.getAllTokens.asScala
+      .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
       .map { t =>
-      val identifier = new DelegationTokenIdentifier()
-      identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(t.getIdentifier)))
-      (identifier.getIssueDate + fraction * renewalInterval).toLong - now
-    }.foldLeft(0L)(math.max)
+        val identifier = new DelegationTokenIdentifier()
+        identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(t.getIdentifier)))
+        (identifier.getIssueDate + fraction * renewalInterval).toLong - now
+      }.foldLeft(0L)(math.max)
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 3f3c662..18a1c52 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -23,7 +23,7 @@ import java.net.URI
 import java.util.{List => JList}
 import java.util.jar.JarFile
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.io.Source
 
@@ -94,7 +94,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], 
env: Map[String, S
 
   // Set parameters from command line arguments
   try {
-    parse(args.toList)
+    parse(args.asJava)
   } catch {
     case e: IllegalArgumentException =>
       SparkSubmit.printErrorAndExit(e.getMessage())
@@ -458,7 +458,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
   }
 
   override protected def handleExtraArgs(extra: JList[String]): Unit = {
-    childArgs ++= extra
+    childArgs ++= extra.asScala
   }
 
   private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 563831c..540e802 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
 
 import java.nio.ByteBuffer
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.curator.framework.CuratorFramework
@@ -49,8 +49,8 @@ private[master] class ZooKeeperPersistenceEngine(conf: 
SparkConf, val serializer
   }
 
   override def read[T: ClassTag](prefix: String): Seq[T] = {
-    val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(prefix))
-    file.map(deserializeFromFile[T]).flatten
+    zk.getChildren.forPath(WORKING_DIR).asScala
+      .filter(_.startsWith(prefix)).map(deserializeFromFile[T]).flatten
   }
 
   override def close() {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 45a3f43..ce02ee2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -18,9 +18,8 @@
 package org.apache.spark.deploy.worker
 
 import java.io.{File, FileOutputStream, InputStream, IOException}
-import java.lang.System._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.Map
 
 import org.apache.spark.Logging
@@ -62,7 +61,7 @@ object CommandUtils extends Logging {
     // SPARK-698: do not call the run.cmd script, as process.destroy()
     // fails to kill a process tree on Windows
     val cmd = new WorkerCommandBuilder(sparkHome, memory, 
command).buildCommand()
-    cmd.toSeq ++ Seq(command.mainClass) ++ command.arguments
+    cmd.asScala ++ Seq(command.mainClass) ++ command.arguments
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index ec51c3d..89159ff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker
 
 import java.io._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
@@ -172,8 +172,8 @@ private[deploy] class DriverRunner(
       CommandUtils.redirectStream(process.getInputStream, stdout)
 
       val stderr = new File(baseDir, "stderr")
-      val header = "Launch Command: %s\n%s\n\n".format(
-        builder.command.mkString("\"", "\" \"", "\""), "=" * 40)
+      val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", 
"\"")
+      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 
40)
       Files.append(header, stderr, UTF_8)
       CommandUtils.redirectStream(process.getErrorStream, stderr)
     }
@@ -229,6 +229,6 @@ private[deploy] trait ProcessBuilderLike {
 private[deploy] object ProcessBuilderLike {
   def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new 
ProcessBuilderLike {
     override def start(): Process = processBuilder.start()
-    override def command: Seq[String] = processBuilder.command()
+    override def command: Seq[String] = processBuilder.command().asScala
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index ab3fea4..3aef051 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker
 
 import java.io._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
@@ -129,7 +129,8 @@ private[deploy] class ExecutorRunner(
       val builder = CommandUtils.buildProcessBuilder(appDesc.command, new 
SecurityManager(conf),
         memory, sparkHome.getAbsolutePath, substituteVariables)
       val command = builder.command()
-      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
+      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
+      logInfo(s"Launch command: $formattedCommand")
 
       builder.directory(executorDir)
       builder.environment.put("SPARK_EXECUTOR_DIRS", 
appLocalDirs.mkString(File.pathSeparator))
@@ -145,7 +146,7 @@ private[deploy] class ExecutorRunner(
 
       process = builder.start()
       val header = "Spark Executor Command: %s\n%s\n\n".format(
-        command.mkString("\"", "\" \"", "\""), "=" * 40)
+        formattedCommand, "=" * 40)
 
       // Redirect its stdout and stderr to files
       val stdout = new File(executorDir, "stdout")

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 79b1536..770927c 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -24,7 +24,6 @@ import java.util.{UUID, Date}
 import java.util.concurrent._
 import java.util.concurrent.{Future => JFuture, ScheduledFuture => 
JScheduledFuture}
 
-import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
 import scala.concurrent.ExecutionContext
 import scala.util.Random

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 42a85e4..c3491bb 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -23,7 +23,7 @@ import java.net.URL
 import java.nio.ByteBuffer
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.util.control.NonFatal
 
@@ -147,7 +147,7 @@ private[spark] class Executor(
 
   /** Returns the total amount of time this JVM process has spent in garbage 
collection. */
   private def computeTotalGcTime(): Long = {
-    ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
+    
ManagementFactory.getGarbageCollectorMXBeans.asScala.map(_.getCollectionTime).sum
   }
 
   class TaskRunner(
@@ -425,7 +425,7 @@ private[spark] class Executor(
     val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
     val curGCTime = computeTotalGcTime()
 
-    for (taskRunner <- runningTasks.values()) {
+    for (taskRunner <- runningTasks.values().asScala) {
       if (taskRunner.task != null) {
         taskRunner.task.metrics.foreach { metrics =>
           metrics.updateShuffleReadMetrics()

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 293c512..d16f4a1 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -19,7 +19,7 @@ package org.apache.spark.executor
 
 import java.util.concurrent.ThreadPoolExecutor
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import com.codahale.metrics.{Gauge, MetricRegistry}
 import org.apache.hadoop.fs.FileSystem
@@ -30,7 +30,7 @@ private[spark]
 class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) 
extends Source {
 
   private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
-    FileSystem.getAllStatistics().find(s => s.getScheme.equals(scheme))
+    FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme))
 
   private def registerFileSystemStat[T](
         scheme: String, name: String, f: FileSystem.Statistics => T, 
defaultValue: T) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala 
b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index cfd672e..0474fd2 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.executor
 
 import java.nio.ByteBuffer
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.mesos.protobuf.ByteString
 import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, 
MesosExecutorDriver}
@@ -28,7 +28,7 @@ import org.apache.mesos.Protos.{TaskStatus => 
MesosTaskStatus, _}
 import org.apache.spark.{Logging, TaskState, SparkConf, SparkEnv}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler.cluster.mesos.{MesosTaskLaunchData}
+import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData
 import org.apache.spark.util.{SignalLogger, Utils}
 
 private[spark] class MesosExecutorBackend
@@ -55,7 +55,7 @@ private[spark] class MesosExecutorBackend
       slaveInfo: SlaveInfo) {
 
     // Get num cores for this task from ExecutorInfo, created in 
MesosSchedulerBackend.
-    val cpusPerTask = executorInfo.getResourcesList
+    val cpusPerTask = executorInfo.getResourcesList.asScala
       .find(_.getName == "cpus")
       .map(_.getScalar.getValue.toInt)
       .getOrElse(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 6cda777..a5ad472 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -19,7 +19,7 @@ package org.apache.spark.input
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import com.google.common.io.ByteStreams
 import org.apache.hadoop.conf.Configuration
@@ -44,12 +44,9 @@ private[spark] abstract class StreamFileInputFormat[T]
    * which is set through setMaxSplitSize
    */
   def setMinPartitions(context: JobContext, minPartitions: Int) {
-    val files = listStatus(context)
-    val totalLen = files.map { file =>
-      if (file.isDir) 0L else file.getLen
-    }.sum
-
-    val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong
+    val files = listStatus(context).asScala
+    val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
+    val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
     super.setMaxSplitSize(maxSplitSize)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala 
b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index aaef7c7..1ba34a1 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.input
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.InputSplit
@@ -52,10 +52,8 @@ private[spark] class WholeTextFileInputFormat
    * which is set through setMaxSplitSize
    */
   def setMinPartitions(context: JobContext, minPartitions: Int) {
-    val files = listStatus(context)
-    val totalLen = files.map { file =>
-      if (file.isDir) 0L else file.getLen
-    }.sum
+    val files = listStatus(context).asScala
+    val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
     val maxSplitSize = Math.ceil(totalLen * 1.0 /
       (if (minPartitions == 0) 1 else minPartitions)).toLong
     super.setMaxSplitSize(maxSplitSize)

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala 
b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
index 9be9872..0c09665 100644
--- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.launcher
 import java.io.File
 import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 import org.apache.spark.deploy.Command
 
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.Command
 private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, 
command: Command)
     extends AbstractCommandBuilder {
 
-  childEnv.putAll(command.environment)
+  childEnv.putAll(command.environment.asJava)
   childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sparkHome)
 
   override def buildCommand(env: JMap[String, String]): JList[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index d749555..dd2d325 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -20,6 +20,7 @@ package org.apache.spark.metrics
 import java.io.{FileInputStream, InputStream}
 import java.util.Properties
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.matching.Regex
 
@@ -58,25 +59,20 @@ private[spark] class MetricsConfig(conf: SparkConf) extends 
Logging {
 
     propertyCategories = subProperties(properties, INSTANCE_REGEX)
     if (propertyCategories.contains(DEFAULT_PREFIX)) {
-      import scala.collection.JavaConversions._
-
-      val defaultProperty = propertyCategories(DEFAULT_PREFIX)
-      for { (inst, prop) <- propertyCategories
-            if (inst != DEFAULT_PREFIX)
-            (k, v) <- defaultProperty
-            if (prop.getProperty(k) == null) } {
-        prop.setProperty(k, v)
+      val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala
+      for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX);
+          (k, v) <- defaultProperty if (prop.get(k) == null)) {
+        prop.put(k, v)
       }
     }
   }
 
   def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, 
Properties] = {
     val subProperties = new mutable.HashMap[String, Properties]
-    import scala.collection.JavaConversions._
-    prop.foreach { kv =>
-      if (regex.findPrefixOf(kv._1).isDefined) {
-        val regex(prefix, suffix) = kv._1
-        subProperties.getOrElseUpdate(prefix, new 
Properties).setProperty(suffix, kv._2)
+    prop.asScala.foreach { kv =>
+      if (regex.findPrefixOf(kv._1.toString).isDefined) {
+        val regex(prefix, suffix) = kv._1.toString
+        subProperties.getOrElseUpdate(prefix, new 
Properties).setProperty(suffix, kv._2.toString)
       }
     }
     subProperties


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to