Repository: spark
Updated Branches:
  refs/heads/master 94e2064fa -> f6d06adf0


http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 341f56d..b92a302 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -33,7 +33,8 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark._
 import org.apache.spark.executor.{TaskMetrics, ShuffleWriteMetrics}
-import org.apache.spark.serializer.{SerializerInstance, Serializer, 
JavaSerializer}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver
+import org.apache.spark.serializer.{JavaSerializer, SerializerInstance}
 import org.apache.spark.storage._
 import org.apache.spark.util.Utils
 
@@ -42,25 +43,31 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
   @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = 
_
   @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: 
DiskBlockManager = _
   @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _
+  @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: 
IndexShuffleBlockResolver = _
+  @Mock(answer = RETURNS_SMART_NULLS) private var dependency: 
ShuffleDependency[Int, Int, Int] = _
 
   private var taskMetrics: TaskMetrics = _
-  private var shuffleWriteMetrics: ShuffleWriteMetrics = _
   private var tempDir: File = _
   private var outputFile: File = _
   private val conf: SparkConf = new SparkConf(loadDefaults = false)
   private val temporaryFilesCreated: mutable.Buffer[File] = new 
ArrayBuffer[File]()
   private val blockIdToFileMap: mutable.Map[BlockId, File] = new 
mutable.HashMap[BlockId, File]
-  private val shuffleBlockId: ShuffleBlockId = new ShuffleBlockId(0, 0, 0)
-  private val serializer: Serializer = new JavaSerializer(conf)
+  private var shuffleHandle: BypassMergeSortShuffleHandle[Int, Int] = _
 
   override def beforeEach(): Unit = {
     tempDir = Utils.createTempDir()
     outputFile = File.createTempFile("shuffle", null, tempDir)
-    shuffleWriteMetrics = new ShuffleWriteMetrics
     taskMetrics = new TaskMetrics
-    taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics)
     MockitoAnnotations.initMocks(this)
+    shuffleHandle = new BypassMergeSortShuffleHandle[Int, Int](
+      shuffleId = 0,
+      numMaps = 2,
+      dependency = dependency
+    )
+    when(dependency.partitioner).thenReturn(new HashPartitioner(7))
+    when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf)))
     when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+    when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile)
     when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
     when(blockManager.getDiskWriter(
       any[BlockId],
@@ -107,18 +114,20 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
 
   test("write empty iterator") {
     val writer = new BypassMergeSortShuffleWriter[Int, Int](
-      new SparkConf(loadDefaults = false),
       blockManager,
-      new HashPartitioner(7),
-      shuffleWriteMetrics,
-      serializer
+      blockResolver,
+      shuffleHandle,
+      0, // MapId
+      taskContext,
+      conf
     )
-    writer.insertAll(Iterator.empty)
-    val partitionLengths = writer.writePartitionedFile(shuffleBlockId, 
taskContext, outputFile)
-    assert(partitionLengths.sum === 0)
+    writer.write(Iterator.empty)
+    writer.stop( /* success = */ true)
+    assert(writer.getPartitionLengths.sum === 0)
     assert(outputFile.exists())
     assert(outputFile.length() === 0)
     assert(temporaryFilesCreated.isEmpty)
+    val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
     assert(shuffleWriteMetrics.shuffleBytesWritten === 0)
     assert(shuffleWriteMetrics.shuffleRecordsWritten === 0)
     assert(taskMetrics.diskBytesSpilled === 0)
@@ -129,17 +138,19 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
     def records: Iterator[(Int, Int)] =
       Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2))
     val writer = new BypassMergeSortShuffleWriter[Int, Int](
-      new SparkConf(loadDefaults = false),
       blockManager,
-      new HashPartitioner(7),
-      shuffleWriteMetrics,
-      serializer
+      blockResolver,
+      shuffleHandle,
+      0, // MapId
+      taskContext,
+      conf
     )
-    writer.insertAll(records)
+    writer.write(records)
+    writer.stop( /* success = */ true)
     assert(temporaryFilesCreated.nonEmpty)
-    val partitionLengths = writer.writePartitionedFile(shuffleBlockId, 
taskContext, outputFile)
-    assert(partitionLengths.sum === outputFile.length())
+    assert(writer.getPartitionLengths.sum === outputFile.length())
     assert(temporaryFilesCreated.count(_.exists()) === 0) // check that 
temporary files were deleted
+    val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
     assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length())
     assert(shuffleWriteMetrics.shuffleRecordsWritten === records.length)
     assert(taskMetrics.diskBytesSpilled === 0)
@@ -148,14 +159,15 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
 
   test("cleanup of intermediate files after errors") {
     val writer = new BypassMergeSortShuffleWriter[Int, Int](
-      new SparkConf(loadDefaults = false),
       blockManager,
-      new HashPartitioner(7),
-      shuffleWriteMetrics,
-      serializer
+      blockResolver,
+      shuffleHandle,
+      0, // MapId
+      taskContext,
+      conf
     )
     intercept[SparkException] {
-      writer.insertAll((0 until 100000).iterator.map(i => {
+      writer.write((0 until 100000).iterator.map(i => {
         if (i == 99990) {
           throw new SparkException("Intentional failure")
         }
@@ -163,7 +175,7 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
       }))
     }
     assert(temporaryFilesCreated.nonEmpty)
-    writer.stop()
+    writer.stop( /* success = */ false)
     assert(temporaryFilesCreated.count(_.exists()) === 0)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
new file mode 100644
index 0000000..8744a07
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.Matchers
+
+import org.apache.spark._
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
+
+/**
+ * Tests for the fallback logic in UnsafeShuffleManager. Actual tests of 
shuffling data are
+ * performed in other suites.
+ */
+class SortShuffleManagerSuite extends SparkFunSuite with Matchers {
+
+  import SortShuffleManager.canUseSerializedShuffle
+
+  private class RuntimeExceptionAnswer extends Answer[Object] {
+    override def answer(invocation: InvocationOnMock): Object = {
+      throw new RuntimeException("Called non-stubbed method, " + 
invocation.getMethod.getName)
+    }
+  }
+
+  private def shuffleDep(
+      partitioner: Partitioner,
+      serializer: Option[Serializer],
+      keyOrdering: Option[Ordering[Any]],
+      aggregator: Option[Aggregator[Any, Any, Any]],
+      mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = {
+    val dep = mock(classOf[ShuffleDependency[Any, Any, Any]], new 
RuntimeExceptionAnswer())
+    doReturn(0).when(dep).shuffleId
+    doReturn(partitioner).when(dep).partitioner
+    doReturn(serializer).when(dep).serializer
+    doReturn(keyOrdering).when(dep).keyOrdering
+    doReturn(aggregator).when(dep).aggregator
+    doReturn(mapSideCombine).when(dep).mapSideCombine
+    dep
+  }
+
+  test("supported shuffle dependencies for serialized shuffle") {
+    val kryo = Some(new KryoSerializer(new SparkConf()))
+
+    assert(canUseSerializedShuffle(shuffleDep(
+      partitioner = new HashPartitioner(2),
+      serializer = kryo,
+      keyOrdering = None,
+      aggregator = None,
+      mapSideCombine = false
+    )))
+
+    val rangePartitioner = mock(classOf[RangePartitioner[Any, Any]])
+    when(rangePartitioner.numPartitions).thenReturn(2)
+    assert(canUseSerializedShuffle(shuffleDep(
+      partitioner = rangePartitioner,
+      serializer = kryo,
+      keyOrdering = None,
+      aggregator = None,
+      mapSideCombine = false
+    )))
+
+    // Shuffles with key orderings are supported as long as no aggregator is 
specified
+    assert(canUseSerializedShuffle(shuffleDep(
+      partitioner = new HashPartitioner(2),
+      serializer = kryo,
+      keyOrdering = Some(mock(classOf[Ordering[Any]])),
+      aggregator = None,
+      mapSideCombine = false
+    )))
+
+  }
+
+  test("unsupported shuffle dependencies for serialized shuffle") {
+    val kryo = Some(new KryoSerializer(new SparkConf()))
+    val java = Some(new JavaSerializer(new SparkConf()))
+
+    // We only support serializers that support object relocation
+    assert(!canUseSerializedShuffle(shuffleDep(
+      partitioner = new HashPartitioner(2),
+      serializer = java,
+      keyOrdering = None,
+      aggregator = None,
+      mapSideCombine = false
+    )))
+
+    // The serialized shuffle path do not support shuffles with more than 16 
million output
+    // partitions, due to a limitation in its sorter implementation.
+    assert(!canUseSerializedShuffle(shuffleDep(
+      partitioner = new HashPartitioner(
+        SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE + 
1),
+      serializer = kryo,
+      keyOrdering = None,
+      aggregator = None,
+      mapSideCombine = false
+    )))
+
+    // We do not support shuffles that perform aggregation
+    assert(!canUseSerializedShuffle(shuffleDep(
+      partitioner = new HashPartitioner(2),
+      serializer = kryo,
+      keyOrdering = None,
+      aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
+      mapSideCombine = false
+    )))
+    assert(!canUseSerializedShuffle(shuffleDep(
+      partitioner = new HashPartitioner(2),
+      serializer = kryo,
+      keyOrdering = Some(mock(classOf[Ordering[Any]])),
+      aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
+      mapSideCombine = true
+    )))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
deleted file mode 100644
index 34b4984..0000000
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.sort
-
-import org.mockito.Mockito._
-
-import org.apache.spark.{Aggregator, SparkConf, SparkFunSuite}
-
-class SortShuffleWriterSuite extends SparkFunSuite {
-
-  import SortShuffleWriter._
-
-  test("conditions for bypassing merge-sort") {
-    val conf = new SparkConf(loadDefaults = false)
-    val agg = mock(classOf[Aggregator[_, _, _]], RETURNS_SMART_NULLS)
-    val ord = implicitly[Ordering[Int]]
-
-    // Numbers of partitions that are above and below the default 
bypassMergeThreshold
-    val FEW_PARTITIONS = 50
-    val MANY_PARTITIONS = 10000
-
-    // Shuffles with no ordering or aggregator: should bypass unless # of 
partitions is high
-    assert(shouldBypassMergeSort(conf, FEW_PARTITIONS, None, None))
-    assert(!shouldBypassMergeSort(conf, MANY_PARTITIONS, None, None))
-
-    // Shuffles with an ordering or aggregator: should not bypass even if they 
have few partitions
-    assert(!shouldBypassMergeSort(conf, FEW_PARTITIONS, None, Some(ord)))
-    assert(!shouldBypassMergeSort(conf, FEW_PARTITIONS, Some(agg), None))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
deleted file mode 100644
index 6727934..0000000
--- 
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe
-
-import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.Matchers
-
-import org.apache.spark._
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
-
-/**
- * Tests for the fallback logic in UnsafeShuffleManager. Actual tests of 
shuffling data are
- * performed in other suites.
- */
-class UnsafeShuffleManagerSuite extends SparkFunSuite with Matchers {
-
-  import UnsafeShuffleManager.canUseUnsafeShuffle
-
-  private class RuntimeExceptionAnswer extends Answer[Object] {
-    override def answer(invocation: InvocationOnMock): Object = {
-      throw new RuntimeException("Called non-stubbed method, " + 
invocation.getMethod.getName)
-    }
-  }
-
-  private def shuffleDep(
-      partitioner: Partitioner,
-      serializer: Option[Serializer],
-      keyOrdering: Option[Ordering[Any]],
-      aggregator: Option[Aggregator[Any, Any, Any]],
-      mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = {
-    val dep = mock(classOf[ShuffleDependency[Any, Any, Any]], new 
RuntimeExceptionAnswer())
-    doReturn(0).when(dep).shuffleId
-    doReturn(partitioner).when(dep).partitioner
-    doReturn(serializer).when(dep).serializer
-    doReturn(keyOrdering).when(dep).keyOrdering
-    doReturn(aggregator).when(dep).aggregator
-    doReturn(mapSideCombine).when(dep).mapSideCombine
-    dep
-  }
-
-  test("supported shuffle dependencies") {
-    val kryo = Some(new KryoSerializer(new SparkConf()))
-
-    assert(canUseUnsafeShuffle(shuffleDep(
-      partitioner = new HashPartitioner(2),
-      serializer = kryo,
-      keyOrdering = None,
-      aggregator = None,
-      mapSideCombine = false
-    )))
-
-    val rangePartitioner = mock(classOf[RangePartitioner[Any, Any]])
-    when(rangePartitioner.numPartitions).thenReturn(2)
-    assert(canUseUnsafeShuffle(shuffleDep(
-      partitioner = rangePartitioner,
-      serializer = kryo,
-      keyOrdering = None,
-      aggregator = None,
-      mapSideCombine = false
-    )))
-
-    // Shuffles with key orderings are supported as long as no aggregator is 
specified
-    assert(canUseUnsafeShuffle(shuffleDep(
-      partitioner = new HashPartitioner(2),
-      serializer = kryo,
-      keyOrdering = Some(mock(classOf[Ordering[Any]])),
-      aggregator = None,
-      mapSideCombine = false
-    )))
-
-  }
-
-  test("unsupported shuffle dependencies") {
-    val kryo = Some(new KryoSerializer(new SparkConf()))
-    val java = Some(new JavaSerializer(new SparkConf()))
-
-    // We only support serializers that support object relocation
-    assert(!canUseUnsafeShuffle(shuffleDep(
-      partitioner = new HashPartitioner(2),
-      serializer = java,
-      keyOrdering = None,
-      aggregator = None,
-      mapSideCombine = false
-    )))
-
-    // We do not support shuffles with more than 16 million output partitions
-    assert(!canUseUnsafeShuffle(shuffleDep(
-      partitioner = new 
HashPartitioner(UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS + 1),
-      serializer = kryo,
-      keyOrdering = None,
-      aggregator = None,
-      mapSideCombine = false
-    )))
-
-    // We do not support shuffles that perform aggregation
-    assert(!canUseUnsafeShuffle(shuffleDep(
-      partitioner = new HashPartitioner(2),
-      serializer = kryo,
-      keyOrdering = None,
-      aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
-      mapSideCombine = false
-    )))
-    assert(!canUseUnsafeShuffle(shuffleDep(
-      partitioner = new HashPartitioner(2),
-      serializer = kryo,
-      keyOrdering = Some(mock(classOf[Ordering[Any]])),
-      aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
-      mapSideCombine = true
-    )))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
deleted file mode 100644
index 259020a..0000000
--- 
a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-
-import org.apache.commons.io.FileUtils
-import org.apache.commons.io.filefilter.TrueFileFilter
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkContext, 
ShuffleSuite}
-import org.apache.spark.rdd.ShuffledRDD
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
-import org.apache.spark.util.Utils
-
-class UnsafeShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
-
-  // This test suite should run all tests in ShuffleSuite with unsafe-based 
shuffle.
-
-  override def beforeAll() {
-    conf.set("spark.shuffle.manager", "tungsten-sort")
-  }
-
-  test("UnsafeShuffleManager properly cleans up files for shuffles that use 
the new shuffle path") {
-    val tmpDir = Utils.createTempDir()
-    try {
-      val myConf = conf.clone()
-        .set("spark.local.dir", tmpDir.getAbsolutePath)
-      sc = new SparkContext("local", "test", myConf)
-      // Create a shuffled RDD and verify that it will actually use the new 
UnsafeShuffle path
-      val rdd = sc.parallelize(1 to 10, 1).map(x => (x, x))
-      val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new 
HashPartitioner(4))
-        .setSerializer(new KryoSerializer(myConf))
-      val shuffleDep = 
shuffledRdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
-      assert(UnsafeShuffleManager.canUseUnsafeShuffle(shuffleDep))
-      def getAllFiles: Set[File] =
-        FileUtils.listFiles(tmpDir, TrueFileFilter.INSTANCE, 
TrueFileFilter.INSTANCE).asScala.toSet
-      val filesBeforeShuffle = getAllFiles
-      // Force the shuffle to be performed
-      shuffledRdd.count()
-      // Ensure that the shuffle actually created files that will need to be 
cleaned up
-      val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle
-      filesCreatedByShuffle.map(_.getName) should be
-        Set("shuffle_0_0_0.data", "shuffle_0_0_0.index")
-      // Check that the cleanup actually removes the files
-      sc.env.blockManager.master.removeShuffle(0, blocking = true)
-      for (file <- filesCreatedByShuffle) {
-        assert (!file.exists(), s"Shuffle file $file was not cleaned up")
-      }
-    } finally {
-      Utils.deleteRecursively(tmpDir)
-    }
-  }
-
-  test("UnsafeShuffleManager properly cleans up files for shuffles that use 
the old shuffle path") {
-    val tmpDir = Utils.createTempDir()
-    try {
-      val myConf = conf.clone()
-        .set("spark.local.dir", tmpDir.getAbsolutePath)
-      sc = new SparkContext("local", "test", myConf)
-      // Create a shuffled RDD and verify that it will actually use the old 
SortShuffle path
-      val rdd = sc.parallelize(1 to 10, 1).map(x => (x, x))
-      val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new 
HashPartitioner(4))
-        .setSerializer(new JavaSerializer(myConf))
-      val shuffleDep = 
shuffledRdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
-      assert(!UnsafeShuffleManager.canUseUnsafeShuffle(shuffleDep))
-      def getAllFiles: Set[File] =
-        FileUtils.listFiles(tmpDir, TrueFileFilter.INSTANCE, 
TrueFileFilter.INSTANCE).asScala.toSet
-      val filesBeforeShuffle = getAllFiles
-      // Force the shuffle to be performed
-      shuffledRdd.count()
-      // Ensure that the shuffle actually created files that will need to be 
cleaned up
-      val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle
-      filesCreatedByShuffle.map(_.getName) should be
-        Set("shuffle_0_0_0.data", "shuffle_0_0_0.index")
-      // Check that the cleanup actually removes the files
-      sc.env.blockManager.master.removeShuffle(0, blocking = true)
-      for (file <- filesCreatedByShuffle) {
-        assert (!file.exists(), s"Shuffle file $file was not cleaned up")
-      }
-    } finally {
-      Utils.deleteRecursively(tmpDir)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/util/collection/ChainedBufferSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ChainedBufferSuite.scala 
b/core/src/test/scala/org/apache/spark/util/collection/ChainedBufferSuite.scala
deleted file mode 100644
index 05306f4..0000000
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ChainedBufferSuite.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.collection
-
-import java.nio.ByteBuffer
-
-import org.scalatest.Matchers._
-
-import org.apache.spark.SparkFunSuite
-
-class ChainedBufferSuite extends SparkFunSuite {
-  test("write and read at start") {
-    // write from start of source array
-    val buffer = new ChainedBuffer(8)
-    buffer.capacity should be (0)
-    verifyWriteAndRead(buffer, 0, 0, 0, 4)
-    buffer.capacity should be (8)
-
-    // write from middle of source array
-    verifyWriteAndRead(buffer, 0, 5, 0, 4)
-    buffer.capacity should be (8)
-
-    // read to middle of target array
-    verifyWriteAndRead(buffer, 0, 0, 5, 4)
-    buffer.capacity should be (8)
-
-    // write up to border
-    verifyWriteAndRead(buffer, 0, 0, 0, 8)
-    buffer.capacity should be (8)
-
-    // expand into second buffer
-    verifyWriteAndRead(buffer, 0, 0, 0, 12)
-    buffer.capacity should be (16)
-
-    // expand into multiple buffers
-    verifyWriteAndRead(buffer, 0, 0, 0, 28)
-    buffer.capacity should be (32)
-  }
-
-  test("write and read at middle") {
-    val buffer = new ChainedBuffer(8)
-
-    // fill to a middle point
-    verifyWriteAndRead(buffer, 0, 0, 0, 3)
-
-    // write from start of source array
-    verifyWriteAndRead(buffer, 3, 0, 0, 4)
-    buffer.capacity should be (8)
-
-    // write from middle of source array
-    verifyWriteAndRead(buffer, 3, 5, 0, 4)
-    buffer.capacity should be (8)
-
-    // read to middle of target array
-    verifyWriteAndRead(buffer, 3, 0, 5, 4)
-    buffer.capacity should be (8)
-
-    // write up to border
-    verifyWriteAndRead(buffer, 3, 0, 0, 5)
-    buffer.capacity should be (8)
-
-    // expand into second buffer
-    verifyWriteAndRead(buffer, 3, 0, 0, 12)
-    buffer.capacity should be (16)
-
-    // expand into multiple buffers
-    verifyWriteAndRead(buffer, 3, 0, 0, 28)
-    buffer.capacity should be (32)
-  }
-
-  test("write and read at later buffer") {
-    val buffer = new ChainedBuffer(8)
-
-    // fill to a middle point
-    verifyWriteAndRead(buffer, 0, 0, 0, 11)
-
-    // write from start of source array
-    verifyWriteAndRead(buffer, 11, 0, 0, 4)
-    buffer.capacity should be (16)
-
-    // write from middle of source array
-    verifyWriteAndRead(buffer, 11, 5, 0, 4)
-    buffer.capacity should be (16)
-
-    // read to middle of target array
-    verifyWriteAndRead(buffer, 11, 0, 5, 4)
-    buffer.capacity should be (16)
-
-    // write up to border
-    verifyWriteAndRead(buffer, 11, 0, 0, 5)
-    buffer.capacity should be (16)
-
-    // expand into second buffer
-    verifyWriteAndRead(buffer, 11, 0, 0, 12)
-    buffer.capacity should be (24)
-
-    // expand into multiple buffers
-    verifyWriteAndRead(buffer, 11, 0, 0, 28)
-    buffer.capacity should be (40)
-  }
-
-
-  // Used to make sure we're writing different bytes each time
-  var rangeStart = 0
-
-  /**
-   * @param buffer The buffer to write to and read from.
-   * @param offsetInBuffer The offset to write to in the buffer.
-   * @param offsetInSource The offset in the array that the bytes are written 
from.
-   * @param offsetInTarget The offset in the array to read the bytes into.
-   * @param length The number of bytes to read and write
-   */
-  def verifyWriteAndRead(
-      buffer: ChainedBuffer,
-      offsetInBuffer: Int,
-      offsetInSource: Int,
-      offsetInTarget: Int,
-      length: Int): Unit = {
-    val source = new Array[Byte](offsetInSource + length)
-    (rangeStart until rangeStart + length).map(_.toByte).copyToArray(source, 
offsetInSource)
-    buffer.write(offsetInBuffer, source, offsetInSource, length)
-    val target = new Array[Byte](offsetInTarget + length)
-    buffer.read(offsetInBuffer, target, offsetInTarget, length)
-    ByteBuffer.wrap(source, offsetInSource, length) should be
-      (ByteBuffer.wrap(target, offsetInTarget, length))
-
-    rangeStart += 100
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
deleted file mode 100644
index 3b67f62..0000000
--- 
a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.collection
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-
-import com.google.common.io.ByteStreams
-
-import org.mockito.Matchers.any
-import org.mockito.Mockito._
-import org.mockito.Mockito.RETURNS_SMART_NULLS
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.Matchers._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.storage.DiskBlockObjectWriter
-
-class PartitionedSerializedPairBufferSuite extends SparkFunSuite {
-  test("OrderedInputStream single record") {
-    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
-
-    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, 
serializerInstance)
-    val struct = SomeStruct("something", 5)
-    buffer.insert(4, 10, struct)
-
-    val bytes = ByteStreams.toByteArray(buffer.orderedInputStream)
-
-    val baos = new ByteArrayOutputStream()
-    val stream = serializerInstance.serializeStream(baos)
-    stream.writeObject(10)
-    stream.writeObject(struct)
-    stream.close()
-
-    baos.toByteArray should be (bytes)
-  }
-
-  test("insert single record") {
-    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
-    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, 
serializerInstance)
-    val struct = SomeStruct("something", 5)
-    buffer.insert(4, 10, struct)
-    val elements = buffer.partitionedDestructiveSortedIterator(None).toArray
-    elements.size should be (1)
-    elements.head should be (((4, 10), struct))
-  }
-
-  test("insert multiple records") {
-    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
-    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, 
serializerInstance)
-    val struct1 = SomeStruct("something1", 8)
-    buffer.insert(6, 1, struct1)
-    val struct2 = SomeStruct("something2", 9)
-    buffer.insert(4, 2, struct2)
-    val struct3 = SomeStruct("something3", 10)
-    buffer.insert(5, 3, struct3)
-
-    val elements = buffer.partitionedDestructiveSortedIterator(None).toArray
-    elements.size should be (3)
-    elements(0) should be (((4, 2), struct2))
-    elements(1) should be (((5, 3), struct3))
-    elements(2) should be (((6, 1), struct1))
-  }
-
-  test("write single record") {
-    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
-    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, 
serializerInstance)
-    val struct = SomeStruct("something", 5)
-    buffer.insert(4, 10, struct)
-    val it = buffer.destructiveSortedWritablePartitionedIterator(None)
-    val (writer, baos) = createMockWriter()
-    assert(it.hasNext)
-    it.nextPartition should be (4)
-    it.writeNext(writer)
-    assert(!it.hasNext)
-
-    val stream = serializerInstance.deserializeStream(new 
ByteArrayInputStream(baos.toByteArray))
-    stream.readObject[AnyRef]() should be (10)
-    stream.readObject[AnyRef]() should be (struct)
-  }
-
-  test("write multiple records") {
-    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
-    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, 
serializerInstance)
-    val struct1 = SomeStruct("something1", 8)
-    buffer.insert(6, 1, struct1)
-    val struct2 = SomeStruct("something2", 9)
-    buffer.insert(4, 2, struct2)
-    val struct3 = SomeStruct("something3", 10)
-    buffer.insert(5, 3, struct3)
-
-    val it = buffer.destructiveSortedWritablePartitionedIterator(None)
-    val (writer, baos) = createMockWriter()
-    assert(it.hasNext)
-    it.nextPartition should be (4)
-    it.writeNext(writer)
-    assert(it.hasNext)
-    it.nextPartition should be (5)
-    it.writeNext(writer)
-    assert(it.hasNext)
-    it.nextPartition should be (6)
-    it.writeNext(writer)
-    assert(!it.hasNext)
-
-    val stream = serializerInstance.deserializeStream(new 
ByteArrayInputStream(baos.toByteArray))
-    val iter = stream.asIterator
-    iter.next() should be (2)
-    iter.next() should be (struct2)
-    iter.next() should be (3)
-    iter.next() should be (struct3)
-    iter.next() should be (1)
-    iter.next() should be (struct1)
-    assert(!iter.hasNext)
-  }
-
-  def createMockWriter(): (DiskBlockObjectWriter, ByteArrayOutputStream) = {
-    val writer = mock(classOf[DiskBlockObjectWriter], RETURNS_SMART_NULLS)
-    val baos = new ByteArrayOutputStream()
-    when(writer.write(any(), any(), any())).thenAnswer(new Answer[Unit] {
-      override def answer(invocationOnMock: InvocationOnMock): Unit = {
-        val args = invocationOnMock.getArguments
-        val bytes = args(0).asInstanceOf[Array[Byte]]
-        val offset = args(1).asInstanceOf[Int]
-        val length = args(2).asInstanceOf[Int]
-        baos.write(bytes, offset, length)
-      }
-    })
-    (writer, baos)
-  }
-}
-
-case class SomeStruct(str: String, num: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 46d92ce..be9c36b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -437,12 +437,9 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.shuffle.manager</code></td>
   <td>sort</td>
   <td>
-    Implementation to use for shuffling data. There are three implementations 
available:
-    <code>sort</code>, <code>hash</code> and the new (1.5+) 
<code>tungsten-sort</code>.
+    Implementation to use for shuffling data. There are two implementations 
available:
+    <code>sort</code> and <code>hash</code>.
     Sort-based shuffle is more memory-efficient and is the default option 
starting in 1.2.
-    Tungsten-sort is similar to the sort based shuffle, with a direct binary 
cache-friendly
-    implementation with a fall back to regular sort based shuffle if its 
requirements are not
-    met.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 0872d3f..b5e661d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -37,6 +37,7 @@ object MimaExcludes {
       Seq(
         MimaBuild.excludeSparkPackage("deploy"),
         MimaBuild.excludeSparkPackage("network"),
+        MimaBuild.excludeSparkPackage("unsafe"),
         // These are needed if checking against the sbt build, since they are 
part of
         // the maven-generated artifacts in 1.3.
         excludePackage("org.spark-project.jetty"),
@@ -44,7 +45,11 @@ object MimaExcludes {
         // SQL execution is considered private.
         excludePackage("org.apache.spark.sql.execution"),
         // SQL columnar is considered private.
-        excludePackage("org.apache.spark.sql.columnar")
+        excludePackage("org.apache.spark.sql.columnar"),
+        // The shuffle package is considered private.
+        excludePackage("org.apache.spark.shuffle"),
+        // The collections utlities are considered pricate.
+        excludePackage("org.apache.spark.util.collection")
       ) ++
       MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++
       MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++
@@ -750,4 +755,4 @@ object MimaExcludes {
       MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
     case _ => Seq()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 1d3379a..7f60c8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
@@ -87,10 +86,8 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
     // fewer partitions (like RangePartitioner, for example).
     val conf = child.sqlContext.sparkContext.conf
     val shuffleManager = SparkEnv.get.shuffleManager
-    val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] ||
-      shuffleManager.isInstanceOf[UnsafeShuffleManager]
+    val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
     val bypassMergeThreshold = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
-    val serializeMapOutputs = 
conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
     if (sortBasedShuffleOn) {
       val bypassIsSupported = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
       if (bypassIsSupported && partitioner.numPartitions <= 
bypassMergeThreshold) {
@@ -99,22 +96,18 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
         // doesn't buffer deserialized records.
         // Note that we'll have to remove this case if we fix SPARK-6026 and 
remove this bypass.
         false
-      } else if (serializeMapOutputs && 
serializer.supportsRelocationOfSerializedObjects) {
-        // SPARK-4550 extended sort-based shuffle to serialize individual 
records prior to sorting
-        // them. This optimization is guarded by a feature-flag and is only 
applied in cases where
-        // shuffle dependency does not specify an aggregator or ordering and 
the record serializer
-        // has certain properties. If this optimization is enabled, we can 
safely avoid the copy.
+      } else if (serializer.supportsRelocationOfSerializedObjects) {
+        // SPARK-4550 and  SPARK-7081 extended sort-based shuffle to serialize 
individual records
+        // prior to sorting them. This optimization is only applied in cases 
where shuffle
+        // dependency does not specify an aggregator or ordering and the 
record serializer has
+        // certain properties. If this optimization is enabled, we can safely 
avoid the copy.
         //
         // Exchange never configures its ShuffledRDDs with aggregators or key 
orderings, so we only
         // need to check whether the optimization is enabled and supported by 
our serializer.
-        //
-        // This optimization also applies to UnsafeShuffleManager (added in 
SPARK-7081).
         false
       } else {
-        // Spark's SortShuffleManager uses `ExternalSorter` to buffer records 
in memory. This code
-        // path is used both when SortShuffleManager is used and when 
UnsafeShuffleManager falls
-        // back to SortShuffleManager to perform a shuffle that the new fast 
path can't handle. In
-        // both cases, we must copy.
+        // Spark's SortShuffleManager uses `ExternalSorter` to buffer records 
in memory, so we must
+        // copy.
         true
       }
     } else if (shuffleManager.isInstanceOf[HashShuffleManager]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 75d1fce..1680d7e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -101,7 +101,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkContext {
     val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be 
overwritten
     Utils.tryWithSafeFinally {
       val conf = new SparkConf()
-        .set("spark.shuffle.spill.initialMemoryThreshold", "1024")
+        .set("spark.shuffle.spill.initialMemoryThreshold", "1")
         .set("spark.shuffle.sort.bypassMergeThreshold", "0")
         .set("spark.testing.memory", "80000")
 
@@ -109,7 +109,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkContext {
       outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
       // prepare data
       val converter = unsafeRowConverter(Array(IntegerType))
-      val data = (1 to 1000).iterator.map { i =>
+      val data = (1 to 10000).iterator.map { i =>
         (i, converter(Row(i)))
       }
       val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
@@ -141,9 +141,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
-  test("SPARK-10403: unsafe row serializer with UnsafeShuffleManager") {
-    val conf = new SparkConf()
-      .set("spark.shuffle.manager", "tungsten-sort")
+  test("SPARK-10403: unsafe row serializer with SortShuffleManager") {
+    val conf = new SparkConf().set("spark.shuffle.manager", "sort")
     sc = new SparkContext("local", "test", conf)
     val row = Row("Hello", 123)
     val unsafeRow = toUnsafeRow(row, Array(StringType, IntegerType))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to