git commit: SPARK-2738. Remove redundant imports in BlockManagerSuite

2014-08-01 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 149910111 - cb9e7d5af


SPARK-2738. Remove redundant imports in BlockManagerSuite

Author: Sandy Ryza sa...@cloudera.com

Closes #1642 from sryza/sandy-spark-2738 and squashes the following commits:

a923e4e [Sandy Ryza] SPARK-2738. Remove redundant imports in BlockManagerSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb9e7d5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb9e7d5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb9e7d5a

Branch: refs/heads/master
Commit: cb9e7d5aff2ce9cb501a2825651224311263ce20
Parents: 1499101
Author: Sandy Ryza sa...@cloudera.com
Authored: Thu Jul 31 23:12:38 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Thu Jul 31 23:12:38 2014 -0700

--
 .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala   | 3 ---
 1 file changed, 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb9e7d5a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index dd4fd53..58ea0cc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -21,9 +21,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.util.Arrays
 
 import akka.actor._
-import org.apache.spark.SparkConf
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
-import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, 
Utils}
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._



git commit: SPARK-983. Support external sorting in sortByKey()

2014-08-01 Thread matei
Repository: spark
Updated Branches:
  refs/heads/master 8ff4417f7 - 72e336997


SPARK-983. Support external sorting in sortByKey()

This patch simply uses the ExternalSorter class from sort-based shuffle.

Closes #931 and Closes #1090

Author: Matei Zaharia ma...@databricks.com

Closes #1677 from mateiz/spark-983 and squashes the following commits:

96b3fda [Matei Zaharia] SPARK-983. Support external sorting in sortByKey()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72e33699
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72e33699
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72e33699

Branch: refs/heads/master
Commit: 72e33699732496fa71e8c8b0de2203b908423fb2
Parents: 8ff4417
Author: Matei Zaharia ma...@databricks.com
Authored: Fri Aug 1 00:16:18 2014 -0700
Committer: Matei Zaharia ma...@databricks.com
Committed: Fri Aug 1 00:16:18 2014 -0700

--
 .../spark/shuffle/hash/HashShuffleReader.scala  | 22 +---
 .../util/collection/ExternalSorterSuite.scala   | 10 +
 2 files changed, 20 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72e33699/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index e32ad9c..7c9dc8e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.hash
 import org.apache.spark.{InterruptibleIterator, TaskContext}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
+import org.apache.spark.util.collection.ExternalSorter
 
 private[spark] class HashShuffleReader[K, C](
 handle: BaseShuffleHandle[K, _, C],
@@ -35,8 +36,8 @@ private[spark] class HashShuffleReader[K, C](
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
-val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, 
startPartition, context,
-  Serializer.getSerializer(dep.serializer))
+val ser = Serializer.getSerializer(dep.serializer)
+val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, 
startPartition, context, ser)
 
 val aggregatedIter: Iterator[Product2[K, C]] = if 
(dep.aggregator.isDefined) {
   if (dep.mapSideCombine) {
@@ -54,16 +55,13 @@ private[spark] class HashShuffleReader[K, C](
 // Sort the output if there is a sort ordering defined.
 dep.keyOrdering match {
   case Some(keyOrd: Ordering[K]) =
-// Define a Comparator for the whole record based on the key Ordering.
-val cmp = new Ordering[Product2[K, C]] {
-  override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = {
-keyOrd.compare(o1._1, o2._1)
-  }
-}
-val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray
-// TODO: do external sort.
-scala.util.Sorting.quickSort(sortBuffer)(cmp)
-sortBuffer.iterator
+// Create an ExternalSorter to sort the data. Note that if 
spark.shuffle.spill is disabled,
+// the ExternalSorter won't spill to disk.
+val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), 
serializer = Some(ser))
+sorter.write(aggregatedIter)
+context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
+context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
+sorter.iterator
   case None =
 aggregatedIter
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/72e33699/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index ddb5df4..65a71e5 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -190,6 +190,11 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext {
 fail(sValue 2 for ${i} was wrong: expected ${expected}, got 
${seq2.toSet})
   }
 }
+
+// sortByKey - should spill ~17 times
+val rddE = sc.parallelize(0 until 10).map(i = (i/4, i))
+val resultE = rddE.sortByKey().collect().toSeq
+assert(resultE === (0 until 10).map(i = (i/4, 

git commit: SPARK-2134: Report metrics before application finishes

2014-08-01 Thread matei
Repository: spark
Updated Branches:
  refs/heads/master 72e336997 - f1957e116


SPARK-2134: Report metrics before application finishes

Author: Rahul Singhal rahul.sing...@guavus.com

Closes #1076 from rahulsinghaliitd/SPARK-2134 and squashes the following 
commits:

15f18b6 [Rahul Singhal] SPARK-2134: Report metrics before application finishes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1957e11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1957e11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1957e11

Branch: refs/heads/master
Commit: f1957e11652a537efd40771f843591a4c9341014
Parents: 72e3369
Author: Rahul Singhal rahul.sing...@guavus.com
Authored: Fri Aug 1 00:33:15 2014 -0700
Committer: Matei Zaharia ma...@databricks.com
Committed: Fri Aug 1 00:33:15 2014 -0700

--
 core/src/main/scala/org/apache/spark/SparkContext.scala  | 1 +
 core/src/main/scala/org/apache/spark/deploy/master/Master.scala  | 2 ++
 core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala  | 1 +
 .../org/apache/spark/executor/CoarseGrainedExecutorBackend.scala | 1 +
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 
 core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala | 4 
 .../main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala   | 4 
 core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala  | 4 
 .../main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala  | 4 
 core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala  | 2 ++
 .../scala/org/apache/spark/metrics/sink/MetricsServlet.scala | 2 ++
 core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala | 1 +
 .../main/scala/org/apache/spark/metrics/sink/GangliaSink.scala   | 4 
 13 files changed, 34 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b25f081..f5a0549 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -990,6 +990,7 @@ class SparkContext(config: SparkConf) extends Logging {
 val dagSchedulerCopy = dagScheduler
 dagScheduler = null
 if (dagSchedulerCopy != null) {
+  env.metricsSystem.report()
   metadataCleaner.cancel()
   cleaner.foreach(_.stop())
   dagSchedulerCopy.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 21f8667..a70ecdb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -154,6 +154,8 @@ private[spark] class Master(
   }
 
   override def postStop() {
+masterMetricsSystem.report()
+applicationMetricsSystem.report()
 // prevent the CompleteRecovery message sending to restarted master
 if (recoveryCompletionTask != null) {
   recoveryCompletionTask.cancel()

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index ce42544..fb5252d 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -357,6 +357,7 @@ private[spark] class Worker(
   }
 
   override def postStop() {
+metricsSystem.report()
 registrationRetryTimer.foreach(_.cancel())
 executors.values.foreach(_.kill())
 drivers.values.foreach(_.kill())

http://git-wip-us.apache.org/repos/asf/spark/blob/f1957e11/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 860b47e..af736de 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -88,6 +88,7 @@ private[spark] class 

git commit: [Spark 2557] fix LOCAL_N_REGEX in createTaskScheduler and make local-n and local-n-failures consistent

2014-08-01 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master f1957e116 - 284771efb


[Spark 2557] fix LOCAL_N_REGEX in createTaskScheduler and make local-n and 
local-n-failures consistent

[SPARK-2557](https://issues.apache.org/jira/browse/SPARK-2557)

Author: Ye Xianjin advance...@gmail.com

Closes #1464 from advancedxy/SPARK-2557 and squashes the following commits:

d844d67 [Ye Xianjin] add local-*-n-failures, bad-local-n, bad-local-n-failures 
test case
3bbc668 [Ye Xianjin] fix LOCAL_N_REGEX regular expression and make 
local_n_failures accept * as all cores on the computer


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/284771ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/284771ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/284771ef

Branch: refs/heads/master
Commit: 284771efbef2d6b22212afd49dd62732a2cf52a8
Parents: f1957e1
Author: Ye Xianjin advance...@gmail.com
Authored: Fri Aug 1 00:34:39 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Fri Aug 1 00:34:39 2014 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   | 10 ++---
 .../SparkContextSchedulerCreationSuite.scala| 23 
 2 files changed, 30 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/284771ef/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f5a0549..0e51356 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1452,9 +1452,9 @@ object SparkContext extends Logging {
   /** Creates a task scheduler based on a given master URL. Extracted for 
testing. */
   private def createTaskScheduler(sc: SparkContext, master: String): 
TaskScheduler = {
 // Regular expression used for local[N] and local[*] master formats
-val LOCAL_N_REGEX = local\[([0-9\*]+)\].r
+val LOCAL_N_REGEX = local\[([0-9]+|\*)\].r
 // Regular expression for local[N, maxRetries], used in tests with failing 
tasks
-val LOCAL_N_FAILURES_REGEX = local\[([0-9]+)\s*,\s*([0-9]+)\].r
+val LOCAL_N_FAILURES_REGEX = local\[([0-9]+|\*)\s*,\s*([0-9]+)\].r
 // Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
 val LOCAL_CLUSTER_REGEX = 
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*].r
 // Regular expression for connecting to Spark deploy clusters
@@ -1484,8 +1484,12 @@ object SparkContext extends Logging {
 scheduler
 
   case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =
+def localCpuCount = Runtime.getRuntime.availableProcessors()
+// local[*, M] means the number of cores on the computer with M 
failures
+// local[N, M] means exactly N threads with M failures
+val threadCount = if (threads == *) localCpuCount else threads.toInt
 val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = 
true)
-val backend = new LocalBackend(scheduler, threads.toInt)
+val backend = new LocalBackend(scheduler, threadCount)
 scheduler.initialize(backend)
 scheduler
 

http://git-wip-us.apache.org/repos/asf/spark/blob/284771ef/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 67e3be2..4b727e5 100644
--- 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -68,6 +68,15 @@ class SparkContextSchedulerCreationSuite
 }
   }
 
+  test(local-*-n-failures) {
+val sched = createTaskScheduler(local[* ,2])
+assert(sched.maxTaskFailures === 2)
+sched.backend match {
+  case s: LocalBackend = assert(s.totalCores === 
Runtime.getRuntime.availableProcessors())
+  case _ = fail()
+}
+  }
+
   test(local-n-failures) {
 val sched = createTaskScheduler(local[4, 2])
 assert(sched.maxTaskFailures === 2)
@@ -77,6 +86,20 @@ class SparkContextSchedulerCreationSuite
 }
   }
 
+  test(bad-local-n) {
+val e = intercept[SparkException] {
+  createTaskScheduler(local[2*])
+}
+assert(e.getMessage.contains(Could not parse Master URL))
+  }
+
+  test(bad-local-n-failures) {
+val e = intercept[SparkException] {
+  createTaskScheduler(local[2*,4])
+}
+assert(e.getMessage.contains(Could not parse Master URL))
+  }

git commit: [SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and fix reflection issue

2014-08-01 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 284771efb - a32f0fb73


[SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and fix 
reflection issue

This PR updates previous Manifest for KafkaInputDStream's Decoder to ClassTag, 
also fix the problem addressed in 
[SPARK-2103](https://issues.apache.org/jira/browse/SPARK-2103).

Previous Java interface cannot actually get the type of Decoder, so when using 
this Manifest to reconstruct the decode object will meet reflection exception.

Also for other two Java interfaces, ClassTag[String] is useless because calling 
Scala API will get the right implicit ClassTag.

Current Kafka unit test cannot actually verify the interface. I've tested these 
interfaces in my local and distribute settings.

Author: jerryshao saisai.s...@intel.com

Closes #1508 from jerryshao/SPARK-2103 and squashes the following commits:

e90c37b [jerryshao] Add Mima excludes
7529810 [jerryshao] Change Manifest to ClassTag for KafkaInputDStream's Decoder 
and fix Decoder construct issue when using Java API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a32f0fb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a32f0fb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a32f0fb7

Branch: refs/heads/master
Commit: a32f0fb73a739c56208cafcd9f08618fb6dd8859
Parents: 284771e
Author: jerryshao saisai.s...@intel.com
Authored: Fri Aug 1 04:32:46 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Fri Aug 1 04:32:46 2014 -0700

--
 .../spark/streaming/kafka/KafkaInputDStream.scala   | 14 +++---
 .../apache/spark/streaming/kafka/KafkaUtils.scala   | 16 +---
 project/MimaExcludes.scala  |  7 ++-
 3 files changed, 18 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a32f0fb7/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 38095e8..e20e2c8 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.kafka
 
 import scala.collection.Map
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
 
 import java.util.Properties
 import java.util.concurrent.Executors
@@ -48,8 +48,8 @@ private[streaming]
 class KafkaInputDStream[
   K: ClassTag,
   V: ClassTag,
-  U : Decoder[_]: Manifest,
-  T : Decoder[_]: Manifest](
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag](
 @transient ssc_ : StreamingContext,
 kafkaParams: Map[String, String],
 topics: Map[String, Int],
@@ -66,8 +66,8 @@ private[streaming]
 class KafkaReceiver[
   K: ClassTag,
   V: ClassTag,
-  U : Decoder[_]: Manifest,
-  T : Decoder[_]: Manifest](
+  U : Decoder[_]: ClassTag,
+  T : Decoder[_]: ClassTag](
 kafkaParams: Map[String, String],
 topics: Map[String, Int],
 storageLevel: StorageLevel
@@ -103,10 +103,10 @@ class KafkaReceiver[
   tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id))
 }
 
-val keyDecoder = 
manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
   .newInstance(consumerConfig.props)
   .asInstanceOf[Decoder[K]]
-val valueDecoder = 
manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
   .newInstance(consumerConfig.props)
   .asInstanceOf[Decoder[V]]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a32f0fb7/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 86bb91f..48668f7 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -65,7 +65,7 @@ object KafkaUtils {
*in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
-  def createStream[K: ClassTag, V: ClassTag, U : Decoder[_]: Manifest, T : 

[1/2] [SPARK-2179][SQL] A minor refactoring Java data type APIs (2179 follow-up).

2014-08-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 8d338f64c - c41fdf04f


http://git-wip-us.apache.org/repos/asf/spark/blob/c41fdf04/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
index d1aa3c8..77353f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.types.util
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.api.java.types.{DataType = JDataType, StructField 
= JStructField}
+import org.apache.spark.sql.api.java.{DataType = JDataType, StructField = 
JStructField}
 
 import scala.collection.JavaConverters._
 
@@ -74,37 +74,37 @@ protected[sql] object DataTypeConversions {
* Returns the equivalent DataType in Scala for the given DataType in Java.
*/
   def asScalaDataType(javaDataType: JDataType): DataType = javaDataType match {
-case stringType: org.apache.spark.sql.api.java.types.StringType =
+case stringType: org.apache.spark.sql.api.java.StringType =
   StringType
-case binaryType: org.apache.spark.sql.api.java.types.BinaryType =
+case binaryType: org.apache.spark.sql.api.java.BinaryType =
   BinaryType
-case booleanType: org.apache.spark.sql.api.java.types.BooleanType =
+case booleanType: org.apache.spark.sql.api.java.BooleanType =
   BooleanType
-case timestampType: org.apache.spark.sql.api.java.types.TimestampType =
+case timestampType: org.apache.spark.sql.api.java.TimestampType =
   TimestampType
-case decimalType: org.apache.spark.sql.api.java.types.DecimalType =
+case decimalType: org.apache.spark.sql.api.java.DecimalType =
   DecimalType
-case doubleType: org.apache.spark.sql.api.java.types.DoubleType =
+case doubleType: org.apache.spark.sql.api.java.DoubleType =
   DoubleType
-case floatType: org.apache.spark.sql.api.java.types.FloatType =
+case floatType: org.apache.spark.sql.api.java.FloatType =
   FloatType
-case byteType: org.apache.spark.sql.api.java.types.ByteType =
+case byteType: org.apache.spark.sql.api.java.ByteType =
   ByteType
-case integerType: org.apache.spark.sql.api.java.types.IntegerType =
+case integerType: org.apache.spark.sql.api.java.IntegerType =
   IntegerType
-case longType: org.apache.spark.sql.api.java.types.LongType =
+case longType: org.apache.spark.sql.api.java.LongType =
   LongType
-case shortType: org.apache.spark.sql.api.java.types.ShortType =
+case shortType: org.apache.spark.sql.api.java.ShortType =
   ShortType
 
-case arrayType: org.apache.spark.sql.api.java.types.ArrayType =
+case arrayType: org.apache.spark.sql.api.java.ArrayType =
   ArrayType(asScalaDataType(arrayType.getElementType), 
arrayType.isContainsNull)
-case mapType: org.apache.spark.sql.api.java.types.MapType =
+case mapType: org.apache.spark.sql.api.java.MapType =
   MapType(
 asScalaDataType(mapType.getKeyType),
 asScalaDataType(mapType.getValueType),
 mapType.isValueContainsNull)
-case structType: org.apache.spark.sql.api.java.types.StructType =
+case structType: org.apache.spark.sql.api.java.StructType =
   StructType(structType.getFields.map(asScalaStructField))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c41fdf04/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
--
diff --git 
a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
 
b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
index 8ee4591..3c92906 100644
--- 
a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
+++ 
b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
@@ -28,9 +28,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.spark.sql.api.java.types.DataType;
-import org.apache.spark.sql.api.java.types.StructField;
-import org.apache.spark.sql.api.java.types.StructType;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;

http://git-wip-us.apache.org/repos/asf/spark/blob/c41fdf04/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaSideDataTypeConversionSuite.java
--
diff --git 
a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaSideDataTypeConversionSuite.java
 

git commit: [SQL][SPARK-2212]Hash Outer Join

2014-08-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master c41fdf04f - 4415722e9


[SQL][SPARK-2212]Hash Outer Join

This patch is to support the hash based outer join. Currently, outer join for 
big relations are resort to `BoradcastNestedLoopJoin`, which is super slow. 
This PR will create 2 hash tables for both relations in the same partition, 
which greatly reduce the table scans.

Here is the testing code that I used:
```
package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

case class Record(key: String, value: String)

object JoinTablePrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 300).map(i = Record(s${i % 
828193}, sval_$i)))

  runSqlHive(SHOW TABLES)
  runSqlHive(DROP TABLE if exists a)
  runSqlHive(DROP TABLE if exists b)
  runSqlHive(DROP TABLE if exists result)
  rdd.registerAsTable(records)

  runSqlHive(CREATE TABLE a (key STRING, value STRING)
 | ROW FORMAT SERDE
 | 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
 | STORED AS RCFILE
   .stripMargin)
  runSqlHive(CREATE TABLE b (key STRING, value STRING)
 | ROW FORMAT SERDE
 | 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
 | STORED AS RCFILE
   .stripMargin)
  runSqlHive(CREATE TABLE result (key STRING, value STRING)
 | ROW FORMAT SERDE
 | 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
 | STORED AS RCFILE
   .stripMargin)

  hql(sfrom records
 | insert into table a
 | select key, value
   .stripMargin)
  hql(sfrom records
 | insert into table b select key + 10, value
   .stripMargin)
}

object JoinTablePerformanceTest extends App {
  import TestHive2._

  hql(SHOW TABLES)
  hql(set spark.sql.shuffle.partitions=20)

  val leftOuterJoin = insert overwrite table result select a.key, b.value from 
a left outer join b on a.key=b.key
  val rightOuterJoin = insert overwrite table result select a.key, b.value 
from a right outer join b on a.key=b.key
  val fullOuterJoin = insert overwrite table result select a.key, b.value from 
a full outer join b on a.key=b.key

  val results = (LeftOuterJoin, benchmark(leftOuterJoin)) :: 
(LeftOuterJoin, benchmark(leftOuterJoin)) ::
(RightOuterJoin, benchmark(rightOuterJoin)) :: 
(RightOuterJoin, benchmark(rightOuterJoin)) ::
(FullOuterJoin, benchmark(fullOuterJoin)) :: 
(FullOuterJoin, benchmark(fullOuterJoin)) :: Nil
  val explains = hql(sexplain $leftOuterJoin).collect ++ hql(sexplain 
$rightOuterJoin).collect ++ hql(sexplain $fullOuterJoin).collect
  println(explains.mkString(,\n))
  results.foreach { case (prompt, result) = {
  println(s$prompt: took ${result._1} ms (${result._2} records))
}
  }

  def benchmark(cmd: String) = {
val begin = System.currentTimeMillis()
val result = hql(cmd)
val end = System.currentTimeMillis()
val count = hql(select count(1) from result).collect.mkString()
((end - begin), count)
  }
}
```
And the result as shown below:
```
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#95,value#98]],
[  HashOuterJoin [key#95], [key#97], LeftOuter, None],
[   Exchange (HashPartitioning [key#95], 20)],
[HiveTableScan [key#95], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#97], 20)],
[HiveTableScan [key#97,value#98], (MetastoreRelation default, b, None), 
None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#102,value#105]],
[  HashOuterJoin [key#102], [key#104], RightOuter, None],
[   Exchange (HashPartitioning [key#102], 20)],
[HiveTableScan [key#102], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#104], 20)],
[HiveTableScan [key#104,value#105], (MetastoreRelation default, b, None), 
None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#109,value#112]],
[  HashOuterJoin [key#109], [key#111], FullOuter, None],
[   Exchange (HashPartitioning [key#109], 20)],
[HiveTableScan [key#109], (MetastoreRelation default, a, None), None],
[   Exchange (HashPartitioning [key#111], 20)],
[HiveTableScan [key#111,value#112], (MetastoreRelation default, b, None), 
None]
LeftOuterJoin: took 16072 ms ([300] records)
LeftOuterJoin: took 14394 ms ([300] records)
RightOuterJoin: took 14802 ms ([300] records)
RightOuterJoin: took 14747 ms ([300] records)
FullOuterJoin: took 17715 ms ([600] records)
FullOuterJoin: took 17629 ms ([600] records)
```

Without this PR, the benchmark will run seems never end.

Author: 

git commit: [SPARK-2729] [SQL] Forgot to match Timestamp type in ColumnBuilder

2014-08-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 4415722e9 - 580c7011c


[SPARK-2729] [SQL] Forgot to match Timestamp type in ColumnBuilder

just a match forgot, found after SPARK-2710 , TimestampType can be used by a 
SchemaRDD generated from JDBC ResultSet

Author: chutium teng@gmail.com

Closes #1636 from chutium/SPARK-2729 and squashes the following commits:

71af77a [chutium] [SPARK-2729] [SQL] added Timestamp in 
NullableColumnAccessorSuite
39cf9f8 [chutium] [SPARK-2729] add Timestamp Type into ColumnBuilder TestSuite, 
ref. #1636
ab6ff97 [chutium] [SPARK-2729] Forgot to match Timestamp type in ColumnBuilder


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/580c7011
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/580c7011
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/580c7011

Branch: refs/heads/master
Commit: 580c7011cab6bc93294b6486e778557216bedb10
Parents: 4415722
Author: chutium teng@gmail.com
Authored: Fri Aug 1 11:31:44 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Aug 1 11:31:44 2014 -0700

--
 .../main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala   | 1 +
 .../apache/spark/sql/columnar/NullableColumnAccessorSuite.scala| 2 +-
 .../org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala | 2 +-
 3 files changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/580c7011/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 74f5630..c416a74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -154,6 +154,7 @@ private[sql] object ColumnBuilder {
   case STRING.typeId  = new StringColumnBuilder
   case BINARY.typeId  = new BinaryColumnBuilder
   case GENERIC.typeId = new GenericColumnBuilder
+  case TIMESTAMP.typeId = new TimestampColumnBuilder
 }).asInstanceOf[ColumnBuilder]
 
 builder.initialize(initialSize, columnName, useCompression)

http://git-wip-us.apache.org/repos/asf/spark/blob/580c7011/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
index 35ab14c..3baa6f8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
@@ -41,7 +41,7 @@ object TestNullableColumnAccessor {
 class NullableColumnAccessorSuite extends FunSuite {
   import ColumnarTestUtils._
 
-  Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, 
GENERIC).foreach {
+  Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC, 
TIMESTAMP).foreach {
 testNullableColumnAccessor(_)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/580c7011/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
index d889852..dc813fe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
@@ -37,7 +37,7 @@ object TestNullableColumnBuilder {
 class NullableColumnBuilderSuite extends FunSuite {
   import ColumnarTestUtils._
 
-  Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, 
GENERIC).foreach {
+  Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC, 
TIMESTAMP).foreach {
 testNullableColumnBuilder(_)
   }
 



git commit: SPARK-1612: Fix potential resource leaks

2014-08-01 Thread matei
Repository: spark
Updated Branches:
  refs/heads/master baf9ce1a4 - f5d9bea20


SPARK-1612: Fix potential resource leaks

JIRA: https://issues.apache.org/jira/browse/SPARK-1612

Move the close statements into a finally block.

Author: zsxwing zsxw...@gmail.com

Closes #535 from zsxwing/SPARK-1612 and squashes the following commits:

ae52f50 [zsxwing] Update to follow the code style
549ba13 [zsxwing] SPARK-1612: Fix potential resource leaks


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5d9bea2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5d9bea2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5d9bea2

Branch: refs/heads/master
Commit: f5d9bea20e0db22c09c1191ca44a6471de765739
Parents: baf9ce1
Author: zsxwing zsxw...@gmail.com
Authored: Fri Aug 1 13:25:04 2014 -0700
Committer: Matei Zaharia ma...@databricks.com
Committed: Fri Aug 1 13:25:04 2014 -0700

--
 .../scala/org/apache/spark/util/Utils.scala | 35 
 1 file changed, 22 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5d9bea2/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f8fbb3a..30073a8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -286,17 +286,23 @@ private[spark] object Utils extends Logging {
  out: OutputStream,
  closeStreams: Boolean = false)
   {
-val buf = new Array[Byte](8192)
-var n = 0
-while (n != -1) {
-  n = in.read(buf)
-  if (n != -1) {
-out.write(buf, 0, n)
+try {
+  val buf = new Array[Byte](8192)
+  var n = 0
+  while (n != -1) {
+n = in.read(buf)
+if (n != -1) {
+  out.write(buf, 0, n)
+}
+  }
+} finally {
+  if (closeStreams) {
+try {
+  in.close()
+} finally {
+  out.close()
+}
   }
-}
-if (closeStreams) {
-  in.close()
-  out.close()
 }
   }
 
@@ -868,9 +874,12 @@ private[spark] object Utils extends Logging {
 val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
 val stream = new FileInputStream(file)
 
-stream.skip(effectiveStart)
-stream.read(buff)
-stream.close()
+try {
+  stream.skip(effectiveStart)
+  stream.read(buff)
+} finally {
+  stream.close()
+}
 Source.fromBytes(buff).mkString
   }
 



git commit: [SPARK-2379] Fix the bug that streaming's receiver may fall into a dead loop

2014-08-01 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master f5d9bea20 - b270309d7


[SPARK-2379] Fix the bug that streaming's receiver may fall into a dead loop

Author: joyyoj suns...@gmail.com

Closes #1694 from joyyoj/SPARK-2379 and squashes the following commits:

d73790d [joyyoj] SPARK-2379 Fix the bug that streaming's receiver may fall into 
a dead loop
22e7821 [joyyoj] Merge remote-tracking branch 'apache/master'
3f4a602 [joyyoj] Merge remote-tracking branch 'remotes/apache/master'
f4660c5 [joyyoj] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes 
are not read properly


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b270309d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b270309d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b270309d

Branch: refs/heads/master
Commit: b270309d7608fb749e402cd5afd36087446be398
Parents: f5d9bea
Author: joyyoj suns...@gmail.com
Authored: Fri Aug 1 13:41:55 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Fri Aug 1 13:41:55 2014 -0700

--
 .../org/apache/spark/streaming/receiver/ReceiverSupervisor.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b270309d/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 09be3a5..1f0244c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -138,7 +138,7 @@ private[streaming] abstract class ReceiverSupervisor(
   onReceiverStop(message, error)
 } catch {
   case t: Throwable =
-stop(Error stopping receiver  + streamId, Some(t))
+logError(Error stopping receiver  + streamId + t.getStackTraceString)
 }
   }
 



git commit: [SPARK-2379] Fix the bug that streaming's receiver may fall into a dead loop

2014-08-01 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 886508d3b - 952e0d698


[SPARK-2379] Fix the bug that streaming's receiver may fall into a dead loop

Author: joyyoj suns...@gmail.com

Closes #1694 from joyyoj/SPARK-2379 and squashes the following commits:

d73790d [joyyoj] SPARK-2379 Fix the bug that streaming's receiver may fall into 
a dead loop
22e7821 [joyyoj] Merge remote-tracking branch 'apache/master'
3f4a602 [joyyoj] Merge remote-tracking branch 'remotes/apache/master'
f4660c5 [joyyoj] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes 
are not read properly

(cherry picked from commit b270309d7608fb749e402cd5afd36087446be398)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/952e0d69
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/952e0d69
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/952e0d69

Branch: refs/heads/branch-1.0
Commit: 952e0d69841b6218e7d1b8b23e7d74a4fcb1b381
Parents: 886508d
Author: joyyoj suns...@gmail.com
Authored: Fri Aug 1 13:41:55 2014 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Fri Aug 1 13:42:16 2014 -0700

--
 .../org/apache/spark/streaming/receiver/ReceiverSupervisor.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/952e0d69/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 09be3a5..1f0244c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -138,7 +138,7 @@ private[streaming] abstract class ReceiverSupervisor(
   onReceiverStop(message, error)
 } catch {
   case t: Throwable =
-stop(Error stopping receiver  + streamId, Some(t))
+logError(Error stopping receiver  + streamId + t.getStackTraceString)
 }
   }
 



git commit: SPARK-2791: Fix committing, reverting and state tracking in shuffle file consolidation

2014-08-01 Thread matei
Repository: spark
Updated Branches:
  refs/heads/master b270309d7 - 78f2af582


SPARK-2791: Fix committing, reverting and state tracking in shuffle file 
consolidation

All changes from this PR are by mridulm and are drawn from his work in #1609. 
This patch is intended to fix all major issues related to shuffle file 
consolidation that mridulm found, while minimizing changes to the code, with 
the hope that it may be more easily merged into 1.1.

This patch is **not** intended as a replacement for #1609, which provides many 
additional benefits, including fixes to ExternalAppendOnlyMap, improvements to 
DiskBlockObjectWriter's API, and several new unit tests.

If it is feasible to merge #1609 for the 1.1 deadline, that is a preferable 
option.

Author: Aaron Davidson aa...@databricks.com

Closes #1678 from aarondav/consol and squashes the following commits:

53b3f6d [Aaron Davidson] Correct behavior when writing unopened file
701d045 [Aaron Davidson] Rebase with sort-based shuffle
9160149 [Aaron Davidson] SPARK-2532: Minimal shuffle consolidation fixes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78f2af58
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78f2af58
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78f2af58

Branch: refs/heads/master
Commit: 78f2af582286b81e6dc9fa9d455ed2b369d933bd
Parents: b270309
Author: Aaron Davidson aa...@databricks.com
Authored: Fri Aug 1 13:57:19 2014 -0700
Committer: Matei Zaharia ma...@databricks.com
Committed: Fri Aug 1 13:57:19 2014 -0700

--
 .../spark/shuffle/hash/HashShuffleWriter.scala  | 14 ++--
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  3 +-
 .../spark/storage/BlockObjectWriter.scala   | 53 +++-
 .../spark/storage/ShuffleBlockManager.scala | 28 ---
 .../util/collection/ExternalAppendOnlyMap.scala |  2 +-
 .../spark/util/collection/ExternalSorter.scala  |  6 +-
 .../spark/storage/DiskBlockManagerSuite.scala   | 87 +++-
 .../apache/spark/tools/StoragePerfTester.scala  |  5 +-
 8 files changed, 146 insertions(+), 52 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 1923f7c..45d3b8b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -65,7 +65,8 @@ private[spark] class HashShuffleWriter[K, V](
   }
 
   /** Close this writer, passing along whether the map completed */
-  override def stop(success: Boolean): Option[MapStatus] = {
+  override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
+var success = initiallySuccess
 try {
   if (stopping) {
 return None
@@ -73,15 +74,16 @@ private[spark] class HashShuffleWriter[K, V](
   stopping = true
   if (success) {
 try {
-  return Some(commitWritesAndBuildStatus())
+  Some(commitWritesAndBuildStatus())
 } catch {
   case e: Exception =
+success = false
 revertWrites()
 throw e
 }
   } else {
 revertWrites()
-return None
+None
   }
 } finally {
   // Release the writers back to the shuffle block manager.
@@ -100,8 +102,7 @@ private[spark] class HashShuffleWriter[K, V](
 var totalBytes = 0L
 var totalTime = 0L
 val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =
-  writer.commit()
-  writer.close()
+  writer.commitAndClose()
   val size = writer.fileSegment().length
   totalBytes += size
   totalTime += writer.timeWriting()
@@ -120,8 +121,7 @@ private[spark] class HashShuffleWriter[K, V](
   private def revertWrites(): Unit = {
 if (shuffle != null  shuffle.writers != null) {
   for (writer - shuffle.writers) {
-writer.revertPartialWrites()
-writer.close()
+writer.revertPartialWritesAndClose()
   }
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/78f2af58/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 42fcd07..9a356d0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ 

git commit: [SPARK-2786][mllib] Python correlations

2014-08-01 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 78f2af582 - d88e69561


[SPARK-2786][mllib] Python correlations

Author: Doris Xin doris.s@gmail.com

Closes #1713 from dorx/pythonCorrelation and squashes the following commits:

5f1e60c [Doris Xin] reviewer comments.
46ff6eb [Doris Xin] reviewer comments.
ad44085 [Doris Xin] style fix
e69d446 [Doris Xin] fixed missed conflicts.
eb5bf56 [Doris Xin] merge master
cc9f725 [Doris Xin] units passed.
9141a63 [Doris Xin] WIP2
d199f1f [Doris Xin] Moved correlation names into a public object
cd163d6 [Doris Xin] WIP


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d88e6956
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d88e6956
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d88e6956

Branch: refs/heads/master
Commit: d88e69561367d65e1a2b94527b80a1f65a2cba90
Parents: 78f2af5
Author: Doris Xin doris.s@gmail.com
Authored: Fri Aug 1 15:02:17 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Fri Aug 1 15:02:17 2014 -0700

--
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  39 ++-
 .../apache/spark/mllib/stat/Statistics.scala|  10 +-
 .../mllib/stat/correlation/Correlation.scala|  49 +
 .../mllib/api/python/PythonMLLibAPISuite.scala  |  21 +++-
 python/pyspark/mllib/_common.py |   6 +-
 python/pyspark/mllib/stat.py| 104 +++
 6 files changed, 199 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d88e6956/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index d2e8ccf..122925d 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -20,13 +20,15 @@ package org.apache.spark.mllib.api.python
 import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
 import org.apache.spark.mllib.random.{RandomRDDGenerators = RG}
 import org.apache.spark.mllib.recommendation._
 import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.mllib.stat.correlation.CorrelationNames
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
@@ -227,7 +229,7 @@ class PythonMLLibAPI extends Serializable {
   jsc: JavaSparkContext,
   path: String,
   minPartitions: Int): JavaRDD[Array[Byte]] =
-MLUtils.loadLabeledPoints(jsc.sc, path, 
minPartitions).map(serializeLabeledPoint).toJavaRDD()
+MLUtils.loadLabeledPoints(jsc.sc, path, 
minPartitions).map(serializeLabeledPoint)
 
   private def trainRegressionModel(
   trainFunc: (RDD[LabeledPoint], Vector) = GeneralizedLinearModel,
@@ -456,6 +458,37 @@ class PythonMLLibAPI extends Serializable {
 ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
   }
 
+  /**
+   * Java stub for mllib Statistics.corr(X: RDD[Vector], method: String).
+   * Returns the correlation matrix serialized into a byte array understood by 
deserializers in
+   * pyspark.
+   */
+  def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
+val inputMatrix = X.rdd.map(deserializeDoubleVector(_))
+val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
+serializeDoubleMatrix(to2dArray(result))
+  }
+
+  /**
+   * Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], 
method: String).
+   */
+  def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): 
Double = {
+val xDeser = x.rdd.map(deserializeDouble(_))
+val yDeser = y.rdd.map(deserializeDouble(_))
+Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method))
+  }
+
+  // used by the corr methods to retrieve the name of the correlation method 
passed in via pyspark
+  private def getCorrNameOrDefault(method: String) = {
+if (method == null) CorrelationNames.defaultCorrName else method
+  }
+
+  // Reformat a Matrix into Array[Array[Double]] for serialization
+  private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
+val values = matrix.toArray
+

git commit: [SPARK-2796] [mllib] DecisionTree bug fix: ordered categorical features

2014-08-01 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master d88e69561 - 7058a5393


[SPARK-2796] [mllib] DecisionTree bug fix: ordered categorical features

Bug: In DecisionTree, the method 
sequentialBinSearchForOrderedCategoricalFeatureInClassification() indexed bins 
from 0 to (math.pow(2, featureCategories.toInt - 1) - 1). This upper bound is 
the bound for unordered categorical features, not ordered ones. The upper bound 
should be the arity (i.e., max value) of the feature.

Added new test to DecisionTreeSuite to catch this: regression stump with 
categorical variables of arity 2

Bug fix: Modified upper bound discussed above.

Also: Small improvements to coding style in DecisionTree.

CC mengxr manishamde

Author: Joseph K. Bradley joseph.kurata.brad...@gmail.com

Closes #1720 from jkbradley/decisiontree-bugfix2 and squashes the following 
commits:

225822f [Joseph K. Bradley] Bug: In DecisionTree, the method 
sequentialBinSearchForOrderedCategoricalFeatureInClassification() indexed bins 
from 0 to (math.pow(2, featureCategories.toInt - 1) - 1). This upper bound is 
the bound for unordered categorical features, not ordered ones. The upper bound 
should be the arity (i.e., max value) of the feature.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7058a539
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7058a539
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7058a539

Branch: refs/heads/master
Commit: 7058a5393bccc2f917189fa9b4cf7f314410b0de
Parents: d88e695
Author: Joseph K. Bradley joseph.kurata.brad...@gmail.com
Authored: Fri Aug 1 15:52:21 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Fri Aug 1 15:52:21 2014 -0700

--
 .../apache/spark/mllib/tree/DecisionTree.scala  | 45 
 .../spark/mllib/tree/DecisionTreeSuite.scala| 29 +
 2 files changed, 56 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7058a539/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
index 7d123dd..382e76a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
@@ -498,7 +498,7 @@ object DecisionTree extends Serializable with Logging {
   val bin = binForFeatures(mid)
   val lowThreshold = bin.lowSplit.threshold
   val highThreshold = bin.highSplit.threshold
-  if ((lowThreshold  feature)  (highThreshold = feature)){
+  if ((lowThreshold  feature)  (highThreshold = feature)) {
 return mid
   }
   else if (lowThreshold = feature) {
@@ -522,28 +522,36 @@ object DecisionTree extends Serializable with Logging {
   }
 
   /**
-   * Sequential search helper method to find bin for categorical feature.
+   * Sequential search helper method to find bin for categorical feature
+   * (for classification and regression).
*/
-  def sequentialBinSearchForOrderedCategoricalFeatureInClassification(): 
Int = {
+  def sequentialBinSearchForOrderedCategoricalFeature(): Int = {
 val featureCategories = strategy.categoricalFeaturesInfo(featureIndex)
-val numCategoricalBins = math.pow(2.0, featureCategories - 1).toInt - 1
+val featureValue = labeledPoint.features(featureIndex)
 var binIndex = 0
-while (binIndex  numCategoricalBins) {
+while (binIndex  featureCategories) {
   val bin = bins(featureIndex)(binIndex)
   val categories = bin.highSplit.categories
-  val features = labeledPoint.features
-  if (categories.contains(features(featureIndex))) {
+  if (categories.contains(featureValue)) {
 return binIndex
   }
   binIndex += 1
 }
+if (featureValue  0 || featureValue = featureCategories) {
+  throw new IllegalArgumentException(
+sDecisionTree given invalid data: +
+s Feature $featureIndex is categorical with values in +
+s {0,...,${featureCategories - 1}, +
+s but a data point gives it value $featureValue.\n +
+  Bad data point:  + labeledPoint.toString)
+}
 -1
   }
 
   if (isFeatureContinuous) {
 // Perform binary search for finding bin for continuous features.
 val binIndex = binarySearchForBins()
-if (binIndex == -1){
+if (binIndex == -1) {
   throw new UnknownError(no bin was found for continuous variable.)
 }
 binIndex
@@ 

git commit: [SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD

2014-08-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 7058a5393 - 880eabec3


[SPARK-2010] [PySpark] [SQL] support nested structure in SchemaRDD

Convert Row in JavaSchemaRDD into Array[Any] and unpickle them as tuple in 
Python, then convert them into namedtuple, so use can access fields just like 
attributes.

This will let nested structure can be accessed as object, also it will reduce 
the size of serialized data and better performance.

root
 |-- field1: integer (nullable = true)
 |-- field2: string (nullable = true)
 |-- field3: struct (nullable = true)
 ||-- field4: integer (nullable = true)
 ||-- field5: array (nullable = true)
 |||-- element: integer (containsNull = false)
 |-- field6: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- field7: string (nullable = true)

Then we can access them by row.field3.field5[0]  or row.field6[5].field7

It also will infer the schema in Python, convert Row/dict/namedtuple/objects 
into tuple before serialization, then call applySchema in JVM. During 
inferSchema(), the top level of dict in row will be StructType, but any nested 
dictionary will be MapType.

You can use pyspark.sql.Row to convert unnamed structure into Row object, make 
the RDD can be inferable. Such as:

ctx.inferSchema(rdd.map(lambda x: Row(a=x[0], b=x[1]))

Or you could use Row to create a class just like namedtuple, for example:

Person = Row(name, age)
ctx.inferSchema(rdd.map(lambda x: Person(*x)))

Also, you can call applySchema to apply an schema to a RDD of tuple/list and 
turn it into a SchemaRDD. The `schema` should be StructType, see the API docs 
for details.

schema = StructType([StructField(name, StringType, True),
StructType(age, IntegerType, True)])
ctx.applySchema(rdd, schema)

PS: In order to use namedtuple to inferSchema, you should make namedtuple 
picklable.

Author: Davies Liu davies@gmail.com

Closes #1598 from davies/nested and squashes the following commits:

f1d15b6 [Davies Liu] verify schema with the first few rows
8852aaf [Davies Liu] check type of schema
abe9e6e [Davies Liu] address comments
61b2292 [Davies Liu] add @deprecated to pythonToJavaMap
1e5b801 [Davies Liu] improve cache of classes
51aa135 [Davies Liu] use Row to infer schema
e9c0d5c [Davies Liu] remove string typed schema
353a3f2 [Davies Liu] fix code style
63de8f8 [Davies Liu] fix typo
c79ca67 [Davies Liu] fix serialization of nested data
6b258b5 [Davies Liu] fix pep8
9d8447c [Davies Liu] apply schema provided by string of names
f5df97f [Davies Liu] refactor, address comments
9d9af55 [Davies Liu] use arrry to applySchema and infer schema in Python
84679b3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
nested
0eaaf56 [Davies Liu] fix doc tests
b3559b4 [Davies Liu] use generated Row instead of namedtuple
c4ddc30 [Davies Liu] fix conflict between name of fields and variables
7f6f251 [Davies Liu] address all comments
d69d397 [Davies Liu] refactor
2cc2d45 [Davies Liu] refactor
182fb46 [Davies Liu] refactor
bc6e9e1 [Davies Liu] switch to new Schema API
547bf3e [Davies Liu] Merge branch 'master' into nested
a435b5a [Davies Liu] add docs and code refactor
2c8debc [Davies Liu] Merge branch 'master' into nested
644665a [Davies Liu] use tuple and namedtuple for schemardd


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/880eabec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/880eabec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/880eabec

Branch: refs/heads/master
Commit: 880eabec37c69ce4e9594d7babfac291b0f93f50
Parents: 7058a53
Author: Davies Liu davies@gmail.com
Authored: Fri Aug 1 18:47:41 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Aug 1 18:47:41 2014 -0700

--
 .../org/apache/spark/api/python/PythonRDD.scala |   69 +-
 python/pyspark/rdd.py   |8 +-
 python/pyspark/sql.py   | 1258 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   87 +-
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |   18 +-
 5 files changed, 996 insertions(+), 444 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/880eabec/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 94d666a..fe9a9e5 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -25,7 +25,7 @@ import java.util.{List = JList, ArrayList = JArrayList, Map 
= JMap, Collectio
 import 

git commit: [SPARK-2212][SQL] Hash Outer Join (follow-up bug fix).

2014-08-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 880eabec3 - 3822f33f3


[SPARK-2212][SQL] Hash Outer Join (follow-up bug fix).

We need to carefully set the ouputPartitioning of the HashOuterJoin Operator. 
Otherwise, we may not correctly handle nulls.

Author: Yin Huai h...@cse.ohio-state.edu

Closes #1721 from yhuai/SPARK-2212-BugFix and squashes the following commits:

ed5eef7 [Yin Huai] Correctly choosing outputPartitioning for the HashOuterJoin 
operator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3822f33f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3822f33f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3822f33f

Branch: refs/heads/master
Commit: 3822f33f3ce1428703a4796d7a119b40a6b32259
Parents: 880eabe
Author: Yin Huai h...@cse.ohio-state.edu
Authored: Fri Aug 1 18:52:01 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Aug 1 18:52:01 2014 -0700

--
 .../org/apache/spark/sql/execution/joins.scala  |  9 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 99 
 .../scala/org/apache/spark/sql/TestData.scala   |  8 ++
 3 files changed, 114 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3822f33f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index 82f0a74..cc138c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -158,7 +158,12 @@ case class HashOuterJoin(
 left: SparkPlan,
 right: SparkPlan) extends BinaryNode {
 
-  override def outputPartitioning: Partitioning = left.outputPartitioning
+  override def outputPartitioning: Partitioning = joinType match {
+case LeftOuter = left.outputPartitioning
+case RightOuter = right.outputPartitioning
+case FullOuter = 
UnknownPartitioning(left.outputPartitioning.numPartitions)
+case x = throw new Exception(sHashOuterJoin should not take $x as the 
JoinType)
+  }
 
   override def requiredChildDistribution =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
@@ -309,7 +314,7 @@ case class HashOuterJoin(
 leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST), 
 rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
 }
-case x = throw new Exception(sNeed to add implementation for $x)
+case x = throw new Exception(sHashOuterJoin should not take $x as 
the JoinType)
   }
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3822f33f/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 0378906..2fc8058 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -197,6 +197,31 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
   (4, D, 4, d) ::
   (5, E, null, null) ::
   (6, F, null, null) :: Nil)
+
+// Make sure we are choosing left.outputPartitioning as the
+// outputPartitioning for the outer join operator.
+checkAnswer(
+  sql(
+
+  |SELECT l.N, count(*)
+  |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
+  |GROUP BY l.N
+.stripMargin),
+  (1, 1) ::
+  (2, 1) ::
+  (3, 1) ::
+  (4, 1) ::
+  (5, 1) ::
+  (6, 1) :: Nil)
+
+checkAnswer(
+  sql(
+
+  |SELECT r.a, count(*)
+  |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
+  |GROUP BY r.a
+.stripMargin),
+  (null, 6) :: Nil)
   }
 
   test(right outer join) {
@@ -232,6 +257,31 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
   (4, d, 4, D) ::
   (null, null, 5, E) ::
   (null, null, 6, F) :: Nil)
+
+// Make sure we are choosing right.outputPartitioning as the
+// outputPartitioning for the outer join operator.
+checkAnswer(
+  sql(
+
+  |SELECT l.a, count(*)
+  |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
+  |GROUP BY l.a
+.stripMargin),
+  (null, 6) :: Nil)
+
+checkAnswer(
+  sql(
+
+  |SELECT r.N, count(*)
+  |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
+  |GROUP BY r.N
+

git commit: [SPARK-2116] Load spark-defaults.conf from SPARK_CONF_DIR if set

2014-08-01 Thread matei
Repository: spark
Updated Branches:
  refs/heads/master 3822f33f3 - 0da07da53


[SPARK-2116] Load spark-defaults.conf from SPARK_CONF_DIR if set

If SPARK_CONF_DIR environment variable is set, search it for 
spark-defaults.conf.

Author: Albert Chu ch...@llnl.gov

Closes #1059 from chu11/SPARK-2116 and squashes the following commits:

9f3ac94 [Albert Chu] SPARK-2116: If SPARK_CONF_DIR environment variable is set, 
search it for spark-defaults.conf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0da07da5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0da07da5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0da07da5

Branch: refs/heads/master
Commit: 0da07da53e5466ec44c8050020cbc4b9957cb949
Parents: 3822f33
Author: Albert Chu ch...@llnl.gov
Authored: Fri Aug 1 19:00:38 2014 -0700
Committer: Matei Zaharia ma...@databricks.com
Committed: Fri Aug 1 19:00:46 2014 -0700

--
 .../org/apache/spark/deploy/SparkSubmitArguments.scala   | 11 +++
 1 file changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0da07da5/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index dd044e6..9391f24 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -86,6 +86,17 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) 
{
   private def mergeSparkProperties(): Unit = {
 // Use common defaults file, if not specified by user
 if (propertiesFile == null) {
+  sys.env.get(SPARK_CONF_DIR).foreach { sparkConfDir =
+val sep = File.separator
+val defaultPath = s${sparkConfDir}${sep}spark-defaults.conf
+val file = new File(defaultPath)
+if (file.exists()) {
+  propertiesFile = file.getAbsolutePath
+}
+  }
+}
+
+if (propertiesFile == null) {
   sys.env.get(SPARK_HOME).foreach { sparkHome =
 val sep = File.separator
 val defaultPath = s${sparkHome}${sep}conf${sep}spark-defaults.conf



git commit: [SPARK-2800]: Exclude scalastyle-output.xml Apache RAT checks

2014-08-01 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 0da07da53 - a38d3c9ef


[SPARK-2800]: Exclude scalastyle-output.xml Apache RAT checks

Author: GuoQiang Li wi...@qq.com

Closes #1729 from witgo/SPARK-2800 and squashes the following commits:

13ca966 [GuoQiang Li] Add scalastyle-output.xml  to .rat-excludes file


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a38d3c9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a38d3c9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a38d3c9e

Branch: refs/heads/master
Commit: a38d3c9efcc0386b52ac4f041920985ae7300e28
Parents: 0da07da
Author: GuoQiang Li wi...@qq.com
Authored: Fri Aug 1 19:35:16 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Fri Aug 1 19:35:16 2014 -0700

--
 .rat-excludes | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a38d3c9e/.rat-excludes
--
diff --git a/.rat-excludes b/.rat-excludes
index 372bc25..bccb043 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -55,3 +55,4 @@ dist/*
 .*ipr
 .*iws
 logs
+.*scalastyle-output.xml



git commit: [SPARK-2764] Simplify daemon.py process structure

2014-08-01 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master a38d3c9ef - e8e0fd691


[SPARK-2764] Simplify daemon.py process structure

Curently, daemon.py forks a pool of numProcessors subprocesses, and those 
processes fork themselves again to create the actual Python worker processes 
that handle data.

I think that this extra layer of indirection is unnecessary and adds a lot of 
complexity.  This commit attempts to remove this middle layer of subprocesses 
by launching the workers directly from daemon.py.

See https://github.com/mesos/spark/pull/563 for the original PR that added 
daemon.py, where I raise some issues with the current design.

Author: Josh Rosen joshro...@apache.org

Closes #1680 from JoshRosen/pyspark-daemon and squashes the following commits:

5abbcb9 [Josh Rosen] Replace magic number: 4 - EINTR
5495dff [Josh Rosen] Throw IllegalStateException if worker launch fails.
b79254d [Josh Rosen] Detect failed fork() calls; improve error logging.
282c2c4 [Josh Rosen] Remove daemon.py exit logging, since it caused problems:
8554536 [Josh Rosen] Fix daemon’s shutdown(); log shutdown reason.
4e0fab8 [Josh Rosen] Remove shared-memory exit_flag; don't die on worker death.
e9892b4 [Josh Rosen] [WIP] [SPARK-2764] Simplify daemon.py process structure.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8e0fd69
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8e0fd69
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8e0fd69

Branch: refs/heads/master
Commit: e8e0fd691a06a2887fdcffb2217b96805ace0cb0
Parents: a38d3c9
Author: Josh Rosen joshro...@apache.org
Authored: Fri Aug 1 19:38:21 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Fri Aug 1 19:38:21 2014 -0700

--
 .../spark/api/python/PythonWorkerFactory.scala  |  10 +-
 python/pyspark/daemon.py| 179 ---
 2 files changed, 79 insertions(+), 110 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e8e0fd69/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 759cbe2..15fe8a9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -64,10 +64,16 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 
   // Attempt to connect, restart and retry once if it fails
   try {
-new Socket(daemonHost, daemonPort)
+val socket = new Socket(daemonHost, daemonPort)
+val launchStatus = new DataInputStream(socket.getInputStream).readInt()
+if (launchStatus != 0) {
+  throw new IllegalStateException(Python daemon failed to launch 
worker)
+}
+socket
   } catch {
 case exc: SocketException =
-  logWarning(Python daemon unexpectedly quit, attempting to restart)
+  logWarning(Failed to open socket to Python daemon:, exc)
+  logWarning(Assuming that daemon unexpectedly quit, attempting to 
restart)
   stopDaemon()
   startDaemon()
   new Socket(daemonHost, daemonPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/e8e0fd69/python/pyspark/daemon.py
--
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 8a5873d..9fde0dd 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -15,64 +15,39 @@
 # limitations under the License.
 #
 
+import numbers
 import os
 import signal
+import select
 import socket
 import sys
 import traceback
-import multiprocessing
-from ctypes import c_bool
 from errno import EINTR, ECHILD
 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
 from pyspark.worker import main as worker_main
 from pyspark.serializers import write_int
 
-try:
-POOLSIZE = multiprocessing.cpu_count()
-except NotImplementedError:
-POOLSIZE = 4
-
-exit_flag = multiprocessing.Value(c_bool, False)
-
-
-def should_exit():
-global exit_flag
-return exit_flag.value
-
 
 def compute_real_exit_code(exit_code):
 # SystemExit's code can be integer or string, but os._exit only accepts 
integers
-import numbers
 if isinstance(exit_code, numbers.Integral):
 return exit_code
 else:
 return 1
 
 
-def worker(listen_sock):
+def worker(sock):
+
+Called by a worker process after the fork().
+
 # Redirect stdout to stderr
 os.dup2(2, 1)
 sys.stdout 

git commit: Streaming mllib [SPARK-2438][MLLIB]

2014-08-01 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master e8e0fd691 - f6a189930


Streaming mllib [SPARK-2438][MLLIB]

This PR implements a streaming linear regression analysis, in which a linear 
regression model is trained online as new data arrive. The design is based on 
discussions with tdas and mengxr, in which we determined how to add this 
functionality in a general way, with minimal changes to existing libraries.

__Summary of additions:__

_StreamingLinearAlgorithm_
- An abstract class for fitting generalized linear models online to streaming 
data, including training on (and updating) a model, and making predictions.

_StreamingLinearRegressionWithSGD_
- Class and companion object for running streaming linear regression

_StreamingLinearRegressionTestSuite_
- Unit tests

_StreamingLinearRegression_
- Example use case: fitting a model online to data from one stream, and making 
predictions on other data

__Notes__
- If this looks good, I can use the StreamingLinearAlgorithm class to easily 
implement other analyses that follow the same logic (Ridge, Lasso, Logistic, 
SVM).

Author: Jeremy Freeman the.freeman@gmail.com
Author: freeman the.freeman@gmail.com

Closes #1361 from freeman-lab/streaming-mllib and squashes the following 
commits:

775ea29 [Jeremy Freeman] Throw error if user doesn't initialize weights
4086fee [Jeremy Freeman] Fixed current weight formatting
8b95b27 [Jeremy Freeman] Restored broadcasting
29f27ec [Jeremy Freeman] Formatting
8711c41 [Jeremy Freeman] Used return to avoid indentation
777b596 [Jeremy Freeman] Restored treeAggregate
74cf440 [Jeremy Freeman] Removed static methods
d28cf9a [Jeremy Freeman] Added usage notes
c3326e7 [Jeremy Freeman] Improved documentation
9541a41 [Jeremy Freeman] Merge remote-tracking branch 'upstream/master' into 
streaming-mllib
66eba5e [Jeremy Freeman] Fixed line lengths
2fe0720 [Jeremy Freeman] Minor cleanup
7d51378 [Jeremy Freeman] Moved streaming loader to MLUtils
b9b69f6 [Jeremy Freeman] Added setter methods
c3f8b5a [Jeremy Freeman] Modified logging
00aafdc [Jeremy Freeman] Add modifiers
14b801e [Jeremy Freeman] Name changes
c7d38a3 [Jeremy Freeman] Move check for empty data to GradientDescent
4b0a5d3 [Jeremy Freeman] Cleaned up tests
74188d6 [Jeremy Freeman] Eliminate dependency on commons
50dd237 [Jeremy Freeman] Removed experimental tag
6bfe1e6 [Jeremy Freeman] Fixed imports
a2a63ad [freeman] Makes convergence test more robust
86220bc [freeman] Streaming linear regression unit tests
fb4683a [freeman] Minor changes for scalastyle consistency
fd31e03 [freeman] Changed logging behavior
453974e [freeman] Fixed indentation
c4b1143 [freeman] Streaming linear regression
604f4d7 [freeman] Expanded private class to include mllib
d99aa85 [freeman] Helper methods for streaming MLlib apps
0898add [freeman] Added dependency on streaming


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6a18993
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6a18993
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6a18993

Branch: refs/heads/master
Commit: f6a1899306c5ad766fea122d3ab4b83436d9f6fd
Parents: e8e0fd6
Author: Jeremy Freeman the.freeman@gmail.com
Authored: Fri Aug 1 20:10:26 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Fri Aug 1 20:10:26 2014 -0700

--
 .../mllib/StreamingLinearRegression.scala   |  73 ++
 mllib/pom.xml   |   5 +
 .../mllib/optimization/GradientDescent.scala|   9 ++
 .../mllib/regression/LinearRegression.scala |   4 +-
 .../regression/StreamingLinearAlgorithm.scala   | 106 +++
 .../StreamingLinearRegressionWithSGD.scala  |  88 
 .../org/apache/spark/mllib/util/MLUtils.scala   |  15 +++
 .../StreamingLinearRegressionSuite.scala| 135 +++
 8 files changed, 433 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6a18993/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
 
b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
new file mode 100644
index 000..1fd37ed
--- /dev/null
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not 

git commit: [SPARK-2550][MLLIB][APACHE SPARK] Support regularization and intercept in pyspark's linear methods.

2014-08-01 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master f6a189930 - c28118922


[SPARK-2550][MLLIB][APACHE SPARK] Support regularization and intercept in 
pyspark's linear methods.

Related to issue: 
[SPARK-2550](https://issues.apache.org/jira/browse/SPARK-2550?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20priority%20%3D%20Major%20ORDER%20BY%20key%20DESC).

Author: Michael Giannakopoulos miccagi...@gmail.com

Closes #1624 from miccagiann/new-branch and squashes the following commits:

c02e5f5 [Michael Giannakopoulos] Merge cleanly with upstream/master.
8dcb888 [Michael Giannakopoulos] Putting the if/else if statements in brackets.
fed8eaa [Michael Giannakopoulos] Adding a space in the message related to the 
IllegalArgumentException.
44e6ff0 [Michael Giannakopoulos] Adding a blank line before python class 
LinearRegressionWithSGD.
8eba9c5 [Michael Giannakopoulos] Change function signatures. Exception is 
thrown from the scala component and not from the python one.
638be47 [Michael Giannakopoulos] Modified code to comply with code standards.
ec50ee9 [Michael Giannakopoulos] Shorten the if-elif-else statement in 
regression.py file
b962744 [Michael Giannakopoulos] Replaced the enum classes, with 
strings-keywords for defining the values of 'regType' parameter.
78853ec [Michael Giannakopoulos] Providing intercept and regualizer 
functionallity for linear methods in only one function.
3ac8874 [Michael Giannakopoulos] Added support for regularizer and intercection 
parameters for linear regression method.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2811892
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2811892
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2811892

Branch: refs/heads/master
Commit: c281189222e645d2c87277c269e2102c3c8ccc95
Parents: f6a1899
Author: Michael Giannakopoulos miccagi...@gmail.com
Authored: Fri Aug 1 21:00:31 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Fri Aug 1 21:00:31 2014 -0700

--
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 28 -
 python/pyspark/mllib/regression.py  | 32 +---
 2 files changed, 49 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c2811892/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 122925d..7d91273 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -23,6 +23,8 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
+import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
 import org.apache.spark.mllib.random.{RandomRDDGenerators = RG}
 import org.apache.spark.mllib.recommendation._
@@ -252,15 +254,27 @@ class PythonMLLibAPI extends Serializable {
   numIterations: Int,
   stepSize: Double,
   miniBatchFraction: Double,
-  initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+  initialWeightsBA: Array[Byte], 
+  regParam: Double,
+  regType: String,
+  intercept: Boolean): java.util.List[java.lang.Object] = {
+val lrAlg = new LinearRegressionWithSGD()
+lrAlg.setIntercept(intercept)
+lrAlg.optimizer
+  .setNumIterations(numIterations)
+  .setRegParam(regParam)
+  .setStepSize(stepSize)
+if (regType == l2) {
+  lrAlg.optimizer.setUpdater(new SquaredL2Updater)
+} else if (regType == l1) {
+  lrAlg.optimizer.setUpdater(new L1Updater)
+} else if (regType != none) {
+  throw new java.lang.IllegalArgumentException(Invalid value for 
'regType' parameter.
++  Can only be initialized using the following string values: [l1, 
l2, none].)
+}
 trainRegressionModel(
   (data, initialWeights) =
-LinearRegressionWithSGD.train(
-  data,
-  numIterations,
-  stepSize,
-  miniBatchFraction,
-  initialWeights),
+lrAlg.run(data, initialWeights),
   dataBytesJRDD,
   initialWeightsBA)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c2811892/python/pyspark/mllib/regression.py
--
diff --git 

git commit: [SPARK-2801][MLlib]: DistributionGenerator renamed to RandomDataGenerator. RandomRDD is now of generic type

2014-08-01 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master e25ec0617 - fda475987


[SPARK-2801][MLlib]: DistributionGenerator renamed to RandomDataGenerator. 
RandomRDD is now of generic type

The RandomRDDGenerators used to only output RDD[Double].
Now RandomRDDGenerators.randomRDD can be used to generate a random RDD[T] via a 
class that extends RandomDataGenerator, by supplying a type T and overriding 
the nextValue() function as they wish.

Author: Burak brk...@gmail.com

Closes #1732 from brkyvz/SPARK-2801 and squashes the following commits:

c94a694 [Burak] [SPARK-2801][MLlib] Missing ClassTags added
22d96fe [Burak] [SPARK-2801][MLlib]: DistributionGenerator renamed to 
RandomDataGenerator, generic types added for RandomRDD instead of Double


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fda47598
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fda47598
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fda47598

Branch: refs/heads/master
Commit: fda475987f3b8b37d563033b0e45706ce433824a
Parents: e25ec06
Author: Burak brk...@gmail.com
Authored: Fri Aug 1 22:32:12 2014 -0700
Committer: Xiangrui Meng m...@databricks.com
Committed: Fri Aug 1 22:32:12 2014 -0700

--
 .../mllib/random/DistributionGenerator.scala| 101 ---
 .../mllib/random/RandomDataGenerator.scala  | 101 +++
 .../mllib/random/RandomRDDGenerators.scala  |  32 +++---
 .../org/apache/spark/mllib/rdd/RandomRDD.scala  |  34 ---
 .../random/DistributionGeneratorSuite.scala |  90 -
 .../mllib/random/RandomDataGeneratorSuite.scala |  90 +
 .../mllib/random/RandomRDDGeneratorsSuite.scala |   8 +-
 7 files changed, 231 insertions(+), 225 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fda47598/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
deleted file mode 100644
index 7ecb409..000
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.random
-
-import cern.jet.random.Poisson
-import cern.jet.random.engine.DRand
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
-
-/**
- * :: Experimental ::
- * Trait for random number generators that generate i.i.d. values from a 
distribution.
- */
-@Experimental
-trait DistributionGenerator extends Pseudorandom with Serializable {
-
-  /**
-   * Returns an i.i.d. sample as a Double from an underlying distribution.
-   */
-  def nextValue(): Double
-
-  /**
-   * Returns a copy of the DistributionGenerator with a new instance of the 
rng object used in the
-   * class when applicable for non-locking concurrent usage.
-   */
-  def copy(): DistributionGenerator
-}
-
-/**
- * :: Experimental ::
- * Generates i.i.d. samples from U[0.0, 1.0]
- */
-@Experimental
-class UniformGenerator extends DistributionGenerator {
-
-  // XORShiftRandom for better performance. Thread safety isn't necessary here.
-  private val random = new XORShiftRandom()
-
-  override def nextValue(): Double = {
-random.nextDouble()
-  }
-
-  override def setSeed(seed: Long) = random.setSeed(seed)
-
-  override def copy(): UniformGenerator = new UniformGenerator()
-}
-
-/**
- * :: Experimental ::
- * Generates i.i.d. samples from the standard normal distribution.
- */
-@Experimental
-class StandardNormalGenerator extends DistributionGenerator {
-
-  // XORShiftRandom for better performance. Thread safety isn't necessary here.
-  private val random = new XORShiftRandom()
-
-  override def nextValue(): Double = {
-  random.nextGaussian()
-  }
-
-  override def setSeed(seed: Long) =