[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1520


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-27 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-50289239
  
LGTM. Merged into master. Thanks for adding random RDD generators!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-50215914
  
QA results for PR 1520:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds the following public classes 
(experimental):trait DistributionGenerator extends Pseudorandom with 
Serializable {class UniformGenerator extends DistributionGenerator 
{class StandardNormalGenerator extends DistributionGenerator {class 
PoissonGenerator(val mean: Double) extends DistributionGenerator {For 
more information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17205/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-50213682
  
QA tests have started for PR 1520. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17205/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread dorx
Github user dorx commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-50213475
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-50208419
  
QA results for PR 1520:- This patch FAILED unit tests.- This patch 
merges cleanly- This patch adds the following public classes 
(experimental):trait DistributionGenerator extends Pseudorandom with 
Serializable {class UniformGenerator extends DistributionGenerator 
{class StandardNormalGenerator extends DistributionGenerator {class 
PoissonGenerator(val mean: Double) extends DistributionGenerator {For 
more information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17197/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-50204159
  
QA tests have started for PR 1520. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17197/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15423644
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.StatCounter
+
+// TODO update tests to use TestingUtils for floating point comparison 
after PR 1367 is merged
+class DistributionGeneratorSuite extends FunSuite {
+
+  def apiChecks(gen: DistributionGenerator) {
+
+// resetting seed should generate the same sequence of random numbers
+gen.setSeed(42L)
+val array1 = (0 until 1000).map(_ => gen.nextValue())
+gen.setSeed(42L)
+val array2 = (0 until 1000).map(_ => gen.nextValue())
+assert(array1.equals(array2))
+
+// newInstance should contain a difference instance of the rng
+// i.e. setting difference seeds for difference instances produces 
different sequences of
+// random numbers.
+val gen2 = gen.copy()
+gen.setSeed(0L)
+val array3 = (0 until 1000).map(_ => gen.nextValue())
+gen2.setSeed(1L)
+val array4 = (0 until 1000).map(_ => gen2.nextValue())
+// Compare arrays instead of elements since individual elements can 
coincide by chance but the
+// sequences should differ given two different seeds.
+assert(!array3.equals(array4))
+
+// test that setting the same seed in the copied instance produces the 
same sequence of numbers
+gen.setSeed(0L)
+val array5 = (0 until 1000).map(_ => gen.nextValue())
+gen2.setSeed(0L)
+val array6 = (0 until 1000).map(_ => gen2.nextValue())
+assert(array5.equals(array6))
+  }
+
+  def distributionChecks(gen: DistributionGenerator,
+  mean: Double = 0.0,
+  stddev: Double = 1.0,
+  epsilon: Double = 1e-3) {
+for (seed <- 0 until 5) {
+  gen.setSeed(seed.toLong)
+  val sample = (0 until 1000).map { _ => gen.nextValue()}
--- End diff --

Old tests ran for 21 s on my laptop. Shaved off a couple 0s everywhere and  
now it finishes in 986 milliseconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-50120556
  
@dorx Besides comments, could you mark distribution generators and methods 
that requires distribution generators `@Experimental`? Part of the reason is 
that we don't have the API in Python and whether we should implement the same 
in Python is not clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15390040
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+import scala.util.Random
+
+private[mllib] class RandomRDDPartition(override val index: Int,
+val size: Int,
+val generator: DistributionGenerator,
+val seed: Long) extends Partition {
+  // Safety check in case a Long > Int.MaxValue cast to an Int was passed 
in as size
+  require(size > 0, "Positive partition size required.")
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
--- End diff --

Could it be a `val`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389923
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.StatCounter
+
+// TODO update tests to use TestingUtils for floating point comparison 
after PR 1367 is merged
+class DistributionGeneratorSuite extends FunSuite {
+
+  def apiChecks(gen: DistributionGenerator) {
+
+// resetting seed should generate the same sequence of random numbers
+gen.setSeed(42L)
+val array1 = (0 until 1000).map(_ => gen.nextValue())
+gen.setSeed(42L)
+val array2 = (0 until 1000).map(_ => gen.nextValue())
+assert(array1.equals(array2))
+
+// newInstance should contain a difference instance of the rng
+// i.e. setting difference seeds for difference instances produces 
different sequences of
+// random numbers.
+val gen2 = gen.copy()
+gen.setSeed(0L)
+val array3 = (0 until 1000).map(_ => gen.nextValue())
+gen2.setSeed(1L)
+val array4 = (0 until 1000).map(_ => gen2.nextValue())
+// Compare arrays instead of elements since individual elements can 
coincide by chance but the
+// sequences should differ given two different seeds.
+assert(!array3.equals(array4))
+
+// test that setting the same seed in the copied instance produces the 
same sequence of numbers
+gen.setSeed(0L)
+val array5 = (0 until 1000).map(_ => gen.nextValue())
+gen2.setSeed(0L)
+val array6 = (0 until 1000).map(_ => gen2.nextValue())
+assert(array5.equals(array6))
+  }
+
+  def distributionChecks(gen: DistributionGenerator,
+  mean: Double = 0.0,
+  stddev: Double = 1.0,
+  epsilon: Double = 1e-3) {
+for (seed <- 0 until 5) {
+  gen.setSeed(seed.toLong)
+  val sample = (0 until 1000).map { _ => gen.nextValue()}
--- End diff --

Just curious, how long does the entire test suite take to finish?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389784
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.StatCounter
+
+/**
+ * Note: avoid including APIs that do not set the seed for the RNG in unit 
tests
+ * in order to guarantee deterministic behavior.
+ *
+ * TODO update tests to use TestingUtils for floating point comparison 
after PR 1367 is merged
+ */
+class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext 
with Serializable {
+
+  def testGeneratedRDD(rdd: RDD[Double],
+  expectedSize: Long,
+  expectedNumPartitions: Int,
+  expectedMean: Double,
+  expectedStddev: Double,
+  epsilon: Double = 0.01) {
+val stats = rdd.stats()
+assert(expectedSize === stats.count)
+assert(expectedNumPartitions === rdd.partitions.size)
+assert(math.abs(stats.mean - expectedMean) < epsilon)
+assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  // assume test RDDs are small
+  def testGeneratedVectorRDD(rdd: RDD[Vector],
+  expectedRows: Long,
+  expectedColumns: Int,
+  expectedNumPartitions: Int,
+  expectedMean: Double,
+  expectedStddev: Double,
+  epsilon: Double = 0.01) {
+assert(expectedNumPartitions === rdd.partitions.size)
+val values = new ArrayBuffer[Double]()
+rdd.collect.foreach { vector => {
+  assert(vector.size === expectedColumns)
+  values ++= vector.toArray
+}}
+assert(expectedRows === values.size / expectedColumns)
+val stats = new StatCounter(values)
+assert(math.abs(stats.mean - expectedMean) < epsilon)
+assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  test("RandomRDD sizes") {
+
+// some cases where size % numParts != 0 to test getPartitions behaves 
correctly
+for ((size, numPartitions) <- List((1, 6), (12345, 1), (1000, 
101))) {
+  val rdd = new RandomRDD(sc, size, numPartitions, new 
UniformGenerator, 0L)
+  assert(rdd.count() === size)
+  assert(rdd.partitions.size === numPartitions)
+
+  // check that partition sizes are balanced
+  val partSizes = rdd.partitions.map( p => 
p.asInstanceOf[RandomRDDPartition].size.toDouble)
+  val partStats = new StatCounter(partSizes)
+  assert(partStats.stdev < 1.0)
+}
+
+// size > Int.MaxValue
+val size = Int.MaxValue.toLong * 100L
+val numPartitions = 101
+val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 
0L)
+assert(rdd.partitions.size === numPartitions)
+val count = rdd.partitions.foldLeft(0L){
+  (count, part) => count + part.asInstanceOf[RandomRDDPartition].size
+}
+assert(count === size)
+
+// size needs to be positive
+try {
+  new RandomRDD(sc, 0, 10, new UniformGenerator, 0L)
--- End diff --

You can use `Intercept[IllegalArgumentException]{ ... }`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389761
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.StatCounter
+
+/**
+ * Note: avoid including APIs that do not set the seed for the RNG in unit 
tests
+ * in order to guarantee deterministic behavior.
+ *
+ * TODO update tests to use TestingUtils for floating point comparison 
after PR 1367 is merged
+ */
+class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext 
with Serializable {
+
+  def testGeneratedRDD(rdd: RDD[Double],
+  expectedSize: Long,
+  expectedNumPartitions: Int,
+  expectedMean: Double,
+  expectedStddev: Double,
+  epsilon: Double = 0.01) {
+val stats = rdd.stats()
+assert(expectedSize === stats.count)
+assert(expectedNumPartitions === rdd.partitions.size)
+assert(math.abs(stats.mean - expectedMean) < epsilon)
+assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  // assume test RDDs are small
+  def testGeneratedVectorRDD(rdd: RDD[Vector],
+  expectedRows: Long,
+  expectedColumns: Int,
+  expectedNumPartitions: Int,
+  expectedMean: Double,
+  expectedStddev: Double,
+  epsilon: Double = 0.01) {
+assert(expectedNumPartitions === rdd.partitions.size)
+val values = new ArrayBuffer[Double]()
+rdd.collect.foreach { vector => {
+  assert(vector.size === expectedColumns)
+  values ++= vector.toArray
+}}
+assert(expectedRows === values.size / expectedColumns)
+val stats = new StatCounter(values)
+assert(math.abs(stats.mean - expectedMean) < epsilon)
+assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  test("RandomRDD sizes") {
+
+// some cases where size % numParts != 0 to test getPartitions behaves 
correctly
+for ((size, numPartitions) <- List((1, 6), (12345, 1), (1000, 
101))) {
+  val rdd = new RandomRDD(sc, size, numPartitions, new 
UniformGenerator, 0L)
+  assert(rdd.count() === size)
+  assert(rdd.partitions.size === numPartitions)
+
+  // check that partition sizes are balanced
+  val partSizes = rdd.partitions.map( p => 
p.asInstanceOf[RandomRDDPartition].size.toDouble)
+  val partStats = new StatCounter(partSizes)
+  assert(partStats.stdev < 1.0)
+}
+
+// size > Int.MaxValue
+val size = Int.MaxValue.toLong * 100L
+val numPartitions = 101
+val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 
0L)
+assert(rdd.partitions.size === numPartitions)
+val count = rdd.partitions.foldLeft(0L){
+  (count, part) => count + part.asInstanceOf[RandomRDDPartition].size
--- End diff --

Move `(count, part) =>` to the line above and insert a space between `){`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389694
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.StatCounter
+
+/**
+ * Note: avoid including APIs that do not set the seed for the RNG in unit 
tests
+ * in order to guarantee deterministic behavior.
+ *
+ * TODO update tests to use TestingUtils for floating point comparison 
after PR 1367 is merged
+ */
+class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext 
with Serializable {
+
+  def testGeneratedRDD(rdd: RDD[Double],
+  expectedSize: Long,
+  expectedNumPartitions: Int,
+  expectedMean: Double,
+  expectedStddev: Double,
+  epsilon: Double = 0.01) {
+val stats = rdd.stats()
+assert(expectedSize === stats.count)
+assert(expectedNumPartitions === rdd.partitions.size)
+assert(math.abs(stats.mean - expectedMean) < epsilon)
+assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  // assume test RDDs are small
+  def testGeneratedVectorRDD(rdd: RDD[Vector],
+  expectedRows: Long,
+  expectedColumns: Int,
+  expectedNumPartitions: Int,
+  expectedMean: Double,
+  expectedStddev: Double,
+  epsilon: Double = 0.01) {
+assert(expectedNumPartitions === rdd.partitions.size)
+val values = new ArrayBuffer[Double]()
+rdd.collect.foreach { vector => {
+  assert(vector.size === expectedColumns)
+  values ++= vector.toArray
+}}
+assert(expectedRows === values.size / expectedColumns)
+val stats = new StatCounter(values)
+assert(math.abs(stats.mean - expectedMean) < epsilon)
+assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  test("RandomRDD sizes") {
+
+// some cases where size % numParts != 0 to test getPartitions behaves 
correctly
+for ((size, numPartitions) <- List((1, 6), (12345, 1), (1000, 
101))) {
+  val rdd = new RandomRDD(sc, size, numPartitions, new 
UniformGenerator, 0L)
+  assert(rdd.count() === size)
+  assert(rdd.partitions.size === numPartitions)
+
+  // check that partition sizes are balanced
+  val partSizes = rdd.partitions.map( p => 
p.asInstanceOf[RandomRDDPartition].size.toDouble)
+  val partStats = new StatCounter(partSizes)
+  assert(partStats.stdev < 1.0)
--- End diff --

Why checking the stdev of partition sizes? It should be `maxPartitionSize - 
minPartitionSize <= 1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389645
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+import scala.util.Random
+
+private[mllib] class RandomRDDPartition(override val index: Int,
+val size: Int,
+val generator: DistributionGenerator,
+val seed: Long) extends Partition {
+  // Safety check in case a Long > Int.MaxValue cast to an Int was passed 
in as size
+  require(size > 0, "Positive partition size required.")
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
--- End diff --

`numSlices` -> `numPartitions` to match the naming in previous code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389677
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.StatCounter
+
+/**
+ * Note: avoid including APIs that do not set the seed for the RNG in unit 
tests
+ * in order to guarantee deterministic behavior.
+ *
+ * TODO update tests to use TestingUtils for floating point comparison 
after PR 1367 is merged
+ */
--- End diff --

Those comments are not JavaDoc. Could you move it inside the class closure 
and change the first line from `/**` to `/*`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389671
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.StatCounter
+
+// TODO update tests to use TestingUtils for floating point comparison 
after PR 1367 is merged
+class DistributionGeneratorSuite extends FunSuite {
+
+  def apiChecks(gen: DistributionGenerator) {
+
+// resetting seed should generate the same sequence of random numbers
+gen.setSeed(42L)
+val array1 = (0 until 1000).map(_ => gen.nextValue())
+gen.setSeed(42L)
+val array2 = (0 until 1000).map(_ => gen.nextValue())
+assert(array1.equals(array2))
+
+// newInstance should contain a difference instance of the rng
+// i.e. setting difference seeds for difference instances produces 
different sequences of
+// random numbers.
+val gen2 = gen.copy()
+gen.setSeed(0L)
+val array3 = (0 until 1000).map(_ => gen.nextValue())
+gen2.setSeed(1L)
+val array4 = (0 until 1000).map(_ => gen2.nextValue())
+// Compare arrays instead of elements since individual elements can 
coincide by chance but the
+// sequences should differ given two different seeds.
+assert(!array3.equals(array4))
+
+// test that setting the same seed in the copied instance produces the 
same sequence of numbers
+gen.setSeed(0L)
+val array5 = (0 until 1000).map(_ => gen.nextValue())
+gen2.setSeed(0L)
+val array6 = (0 until 1000).map(_ => gen2.nextValue())
+assert(array5.equals(array6))
+  }
+
+  def distributionChecks(gen: DistributionGenerator,
+  mean: Double = 0.0,
+  stddev: Double = 1.0,
+  epsilon: Double = 1e-3) {
+for (seed <- 0 until 5) {
+  gen.setSeed(seed.toLong)
+  val sample = (0 until 1000).map { _ => gen.nextValue()}
+  val stats = new StatCounter(sample)
+  assert(math.abs(stats.mean - mean) < epsilon)
+  assert(math.abs(stats.stdev - stddev) < epsilon)
+}
+  }
+
+  test("UniformGenerator") {
+val uniform = new UniformGenerator()
+apiChecks(uniform)
+// Stddev of uniform distribution = (ub - lb) / math.sqrt(12)
+distributionChecks(uniform, 0.5, 1 / math.sqrt(12))
+  }
+
+  test("StandardNormalGenerator") {
+val normal = new StandardNormalGenerator()
+apiChecks(normal)
+distributionChecks(normal, 0.0, 1.0)
+  }
+
+  test("PoissonGenerator") {
+// mean = 0.0 will not pass the API checks since 0.0 is always 
deterministically produced.
+for (mean <- List(1.0, 5.0, 100.0)) {
+  val poisson = new PoissonGenerator(mean)
+  apiChecks(poisson)
+  distributionChecks(poisson, mean, math.sqrt(mean), 1e-2)
+}
+  }
+}
+
--- End diff --

remove extra empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389681
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
+import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.StatCounter
+
+/**
+ * Note: avoid including APIs that do not set the seed for the RNG in unit 
tests
+ * in order to guarantee deterministic behavior.
+ *
+ * TODO update tests to use TestingUtils for floating point comparison 
after PR 1367 is merged
+ */
+class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext 
with Serializable {
+
+  def testGeneratedRDD(rdd: RDD[Double],
+  expectedSize: Long,
+  expectedNumPartitions: Int,
+  expectedMean: Double,
+  expectedStddev: Double,
+  epsilon: Double = 0.01) {
+val stats = rdd.stats()
+assert(expectedSize === stats.count)
+assert(expectedNumPartitions === rdd.partitions.size)
+assert(math.abs(stats.mean - expectedMean) < epsilon)
+assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  // assume test RDDs are small
+  def testGeneratedVectorRDD(rdd: RDD[Vector],
+  expectedRows: Long,
+  expectedColumns: Int,
+  expectedNumPartitions: Int,
+  expectedMean: Double,
+  expectedStddev: Double,
+  epsilon: Double = 0.01) {
+assert(expectedNumPartitions === rdd.partitions.size)
+val values = new ArrayBuffer[Double]()
+rdd.collect.foreach { vector => {
+  assert(vector.size === expectedColumns)
+  values ++= vector.toArray
+}}
+assert(expectedRows === values.size / expectedColumns)
+val stats = new StatCounter(values)
+assert(math.abs(stats.mean - expectedMean) < epsilon)
+assert(math.abs(stats.stdev - expectedStddev) < epsilon)
+  }
+
+  test("RandomRDD sizes") {
+
+// some cases where size % numParts != 0 to test getPartitions behaves 
correctly
+for ((size, numPartitions) <- List((1, 6), (12345, 1), (1000, 
101))) {
+  val rdd = new RandomRDD(sc, size, numPartitions, new 
UniformGenerator, 0L)
+  assert(rdd.count() === size)
+  assert(rdd.partitions.size === numPartitions)
+
+  // check that partition sizes are balanced
+  val partSizes = rdd.partitions.map( p => 
p.asInstanceOf[RandomRDDPartition].size.toDouble)
--- End diff --

`( ` -> `(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389665
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+import scala.util.Random
+
+private[mllib] class RandomRDDPartition(override val index: Int,
+val size: Int,
+val generator: DistributionGenerator,
+val seed: Long) extends Partition {
+  // Safety check in case a Long > Int.MaxValue cast to an Int was passed 
in as size
+  require(size > 0, "Positive partition size required.")
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue,
+"Partition size cannot exceed Int.MaxValue")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+  require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue,
+"Partition size cannot exceed Int.MaxValue")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+var start: Long = 0
+var end: Long = 0
+val random = new Random(seed)
+while (i < numSlices) {
+  end = ((i + 1) * size) / numSlices
+  partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, 
random.nextLong())
+  start = end
+  i += 1
+}
+partitions.asInstanceOf[Array[Partition]]
+  }
+
+  // The RNG has to be reset every time the iterator is requested to 
guarantee same data
+  // every time the content of the RDD is examined.
+  def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
+val generator = partition.generator.copy()
+generator.setSeed(partition.seed)
+Iterator.fill(partition.size)(generator.nextValue())
+  }
+
+  // The RNG has to be reset every time the iterator is requested to 
guarantee same data
+  // every 

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389640
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+import scala.util.Random
+
+private[mllib] class RandomRDDPartition(override val index: Int,
+val size: Int,
+val generator: DistributionGenerator,
+val seed: Long) extends Partition {
+  // Safety check in case a Long > Int.MaxValue cast to an Int was passed 
in as size
+  require(size > 0, "Positive partition size required.")
--- End diff --

If `numPartiitons` > `size`, there would be empty partitions. We should 
allow this case, because it happens when a user uses the default number of 
partitions, which may be greater than `size`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389629
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+import scala.util.Random
+
+private[mllib] class RandomRDDPartition(override val index: Int,
+val size: Int,
+val generator: DistributionGenerator,
+val seed: Long) extends Partition {
+  // Safety check in case a Long > Int.MaxValue cast to an Int was passed 
in as size
--- End diff --

The comment needs update because it is possible to cast a Long to a 
positive Int:

~~~
scala> 1000L.toInt
res0: Int = 1215752192
~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389602
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * Returns an i.i.d sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * Returns a copy of the DistributionGenerator with a new instance of 
the rng object used in the
+   * class when applicable for non-locking concurrent usage.
+   */
+  def copy(): DistributionGenerator
+}
+
+/**
+ * Generates i.i.d. samples from U[0.0, 1.0]
+ */
+class UniformGenerator extends DistributionGenerator {
+
+  // XORShiftRandom for better performance. Thread safety isn't necessary 
here.
+  private val random = new XORShiftRandom()
+
+  override def nextValue(): Double = {
+random.nextDouble()
+  }
+
+  override def setSeed(seed: Long) = random.setSeed(seed)
+
+  override def copy(): UniformGenerator = new UniformGenerator()
+}
+
+/**
+ * Generates i.i.d. samples from the Standard Normal Distribution.
--- End diff --

`Standard Normal Distribution` -> `standard normal distribution`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389618
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+/**
+ * Generator methods for creating RDDs comprised of i.i.d samples from 
some distribution.
+ *
+ * TODO Generate RDD[Vector] from multivariate distributions.
+ */
+object RandomRDDGenerators {
+
+  /**
+   * Generates an RDD comprised of i.i.d samples from the uniform 
distribution on [0.0, 1.0].
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the 
generator in each partition.
+   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+   */
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val uniform = new UniformGenerator()
+randomRDD(sc, uniform,  size, numPartitions, seed)
+  }
+
+  /**
+   * Generates an RDD comprised of i.i.d samples from the uniform 
distribution on [0.0, 1.0].
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+   */
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * Generates an RDD comprised of i.i.d samples from the uniform 
distribution on [0.0, 1.0].
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
+   */
+  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  /**
+   * Generates an RDD comprised of i.i.d samples from the standard normal 
distribution.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @param seed Seed for the RNG that generates the seed for the 
generator in each partition.
+   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+   */
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val normal = new StandardNormalGenerator()
+randomRDD(sc, normal, size, numPartitions, seed)
+  }
+
+  /**
+   * Generates an RDD comprised of i.i.d samples from the standard normal 
distribution.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
+   */
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  /**
+   * Generates an RDD comprised of i.i.d samples from the standard normal 
distribution.
+   * sc.defaultParallelism used for the number of partitions in the RDD.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param size Size of the RDD.
+   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389612
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+/**
+ * Generator methods for creating RDDs comprised of i.i.d samples from 
some distribution.
+ *
+ * TODO Generate RDD[Vector] from multivariate distributions.
--- End diff --

Move `TODO: ...` inside code block. Otherwise, it becomes part of the 
documentation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-25 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15389599
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
--- End diff --

`i.i.d` -> `i.i.d.` and in other places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-49946571
  
QA results for PR 1520:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds the following public classes 
(experimental):trait DistributionGenerator extends Pseudorandom with 
Serializable {class UniformGenerator extends DistributionGenerator 
{class StandardNormalGenerator extends DistributionGenerator {class 
PoissonGenerator(val mean: Double) extends DistributionGenerator {For 
more information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17060/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-49935146
  
QA tests have started for PR 1520. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17060/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-23 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15307029
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+// TODO add Scaladocs once API fully approved
+// Alternatively, we can use the generator pattern to set numPartitions, 
seed, etc instead to bring
+// down the number of methods here.
+object RandomRDDGenerators {
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val uniform = new UniformGenerator()
+randomRDD(sc, size, numPartitions, uniform, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val normal = new StandardNormalGenerator()
+randomRDD(sc, size, numPartitions, normal, seed)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+normalRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
+normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  def poissonRDD(sc: SparkContext,
+  size: Long,
+  numPartitions: Int,
+  mean: Double,
+  seed: Long): RDD[Double] = {
+val poisson = new PoissonGenerator(mean)
+randomRDD(sc, size, numPartitions, poisson, seed)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): 
RDD[Double] = {
+poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: 
Double): RDD[Double] = {
+poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] 
= {
+poissonRDD(sc, size, sc.defaultParallelism, mean, 
Utils.random.nextLong)
+  }
+
+  def randomRDD(sc: SparkContext,
+  size: Long,
+  numPartitions: Int,
+  distribution: DistributionGenerator,
--- End diff --

Hmm let's go with `generator` to be clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-23 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15305960
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
+val otherPartitionSize = size / numSlices
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+while (i < numSlices) {
+  partitions(i) =  if (i == 0) {
+new RandomRDDPartition(i, firstPartitionSize, rng, seed)
+  } else {
+new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), 
seed)
+  }
+  i += 1
+}
+partitions.asInstanceO

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15275118
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
+val otherPartitionSize = size / numSlices
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+while (i < numSlices) {
+  partitions(i) =  if (i == 0) {
+new RandomRDDPartition(i, firstPartitionSize, rng, seed)
+  } else {
+new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), 
seed)
+  }
+  i += 1
+}
+partitions.asInstanc

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-23 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15275100
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
+val otherPartitionSize = size / numSlices
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+while (i < numSlices) {
+  partitions(i) =  if (i == 0) {
+new RandomRDDPartition(i, firstPartitionSize, rng, seed)
+  } else {
+new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), 
seed)
+  }
+  i += 1
+}
+partitions.asInstanc

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15271948
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+// TODO add Scaladocs once API fully approved
+// Alternatively, we can use the generator pattern to set numPartitions, 
seed, etc instead to bring
+// down the number of methods here.
+object RandomRDDGenerators {
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val uniform = new UniformGenerator()
+randomRDD(sc, size, numPartitions, uniform, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val normal = new StandardNormalGenerator()
+randomRDD(sc, size, numPartitions, normal, seed)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+normalRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
+normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  def poissonRDD(sc: SparkContext,
+  size: Long,
+  numPartitions: Int,
+  mean: Double,
+  seed: Long): RDD[Double] = {
+val poisson = new PoissonGenerator(mean)
+randomRDD(sc, size, numPartitions, poisson, seed)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): 
RDD[Double] = {
+poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: 
Double): RDD[Double] = {
+poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] 
= {
+poissonRDD(sc, size, sc.defaultParallelism, mean, 
Utils.random.nextLong)
+  }
+
+  def randomRDD(sc: SparkContext,
+  size: Long,
+  numPartitions: Int,
+  distribution: DistributionGenerator,
--- End diff --

Which do you prefer, `rng` or `generator`? `rng` feels like an instance of 
`RandomNumberGenerator`. As we discussed, `DistributionGenerator` is different 
from `RandomNumberGenerator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15271907
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
--- End diff --

So let us not put more than `Int.MaxValue` items to a single partition. If 
the size is beyond this limit, we should throw an error. Different from 
`collect`, `glom` converts `RDD[T]` to `RDD[Array[T]]`, coalescing all elements 
within each partition into an array. `glom` is not frequently used but we 
should support `cache`, which means we shouldn't allow more than `Int.MaxValue` 
number of items on a single partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15271812
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
+val otherPartitionSize = size / numSlices
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+while (i < numSlices) {
+  partitions(i) =  if (i == 0) {
+new RandomRDDPartition(i, firstPartitionSize, rng, seed)
--- End diff --

It feels more secure if people see the following lines in the same group.

~~~
val thisGenerator = generator.copy()
thisGenerator.s

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15266943
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+// TODO add Scaladocs once API fully approved
+// Alternatively, we can use the generator pattern to set numPartitions, 
seed, etc instead to bring
+// down the number of methods here.
+object RandomRDDGenerators {
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val uniform = new UniformGenerator()
+randomRDD(sc, size, numPartitions, uniform, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val normal = new StandardNormalGenerator()
+randomRDD(sc, size, numPartitions, normal, seed)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+normalRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
+normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  def poissonRDD(sc: SparkContext,
+  size: Long,
+  numPartitions: Int,
+  mean: Double,
+  seed: Long): RDD[Double] = {
+val poisson = new PoissonGenerator(mean)
+randomRDD(sc, size, numPartitions, poisson, seed)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): 
RDD[Double] = {
+poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: 
Double): RDD[Double] = {
+poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] 
= {
+poissonRDD(sc, size, sc.defaultParallelism, mean, 
Utils.random.nextLong)
+  }
+
+  def randomRDD(sc: SparkContext,
+  size: Long,
+  numPartitions: Int,
+  distribution: DistributionGenerator,
--- End diff --

copypasta fail. meant to change all of them to `rng`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15266470
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
--- End diff --

Switching to the slicing logic from `ParallelCollectionRDD` for more 
balanced partition sizes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15265929
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
+val otherPartitionSize = size / numSlices
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+while (i < numSlices) {
+  partitions(i) =  if (i == 0) {
+new RandomRDDPartition(i, firstPartitionSize, rng, seed)
--- End diff --

Or I could just add `.newInstance()' in line 118, which I prefer since it's 
not necessary for us to have a new rng every time the RDD is computed. B

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15265778
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
+val otherPartitionSize = size / numSlices
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+while (i < numSlices) {
+  partitions(i) =  if (i == 0) {
+new RandomRDDPartition(i, firstPartitionSize, rng, seed)
+  } else {
+new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), 
seed)
+  }
+  i += 1
+}
+partitions.asInstanceO

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15265484
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * @return An i.i.d sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * @return A copy of the DistributionGenerator with a new instance of 
the rng object used in the
+   * class when applicable. Each partition has a unique seed and 
therefore requires its
+   * own instance of the DistributionGenerator.
+   */
+  def newInstance(): DistributionGenerator
+}
+
+/**
+ * Generates i.i.d. samples from U[0.0, 1.0]
+ */
+class UniformGenerator() extends DistributionGenerator {
+
+  // XORShiftRandom for better performance. Thread safety isn't necessary 
here.
+  private val random = new XORShiftRandom()
+
+  /**
+   * @return An i.i.d sample as a Double from U[0.0, 1.0].
+   */
+  override def nextValue(): Double = {
+random.nextDouble()
+  }
+
+  /** Set random seed. */
+  override def setSeed(seed: Long) = random.setSeed(seed)
+
+  override def newInstance(): UniformGenerator = new UniformGenerator()
+}
+
+/**
+ * Generates i.i.d. samples from the Standard Normal Distribution.
+ */
+class StandardNormalGenerator() extends DistributionGenerator {
+
+  // XORShiftRandom for better performance. Thread safety isn't necessary 
here.
+  private val random = new XORShiftRandom()
--- End diff --

As with most random objects, the DistributionGenerator should be created 
with a default seed (so using it before calling `setSeed` is legal). I like how 
in Colt it's called `reseed` instead, but `setSeed` is also widely adopted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15265297
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
--- End diff --

There's no restrictions on the size of each Partition from the trait (in 
fact it doesn't even need to have a size). The restriction of `size <= 
Int.MaxValue` happens when the RDD is cached. In the case of `glom`, the size 
of the entire RDD needs to be `<= Int.MaxValue` since that's the max size of an 
array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15262023
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * @return An i.i.d sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * @return A copy of the DistributionGenerator with a new instance of 
the rng object used in the
+   * class when applicable. Each partition has a unique seed and 
therefore requires its
+   * own instance of the DistributionGenerator.
+   */
+  def newInstance(): DistributionGenerator
--- End diff --

So ideally `newInstance` would be an abstract static method inside the 
DistributionGenerator class that takes an instance as an argument to better 
express the fact we're copying the class members in the new instance, but since 
abstract static methods in interfaces aren't really supported in Scala (a 
combination of trait and object here will be messy for users to implement), 
copy will do nicely here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread falaki
Github user falaki commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15246377
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+// TODO add Scaladocs once API fully approved
+// Alternatively, we can use the generator pattern to set numPartitions, 
seed, etc instead to bring
+// down the number of methods here.
+object RandomRDDGenerators {
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val uniform = new UniformGenerator()
+randomRDD(sc, size, numPartitions, uniform, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
--- End diff --

Alternatively we can switch to a generator model. Doris suggested it in her 
TODO. What do you guys think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15215522
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * @return An i.i.d sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * @return A copy of the DistributionGenerator with a new instance of 
the rng object used in the
+   * class when applicable. Each partition has a unique seed and 
therefore requires its
+   * own instance of the DistributionGenerator.
+   */
+  def newInstance(): DistributionGenerator
+}
+
+/**
+ * Generates i.i.d. samples from U[0.0, 1.0]
+ */
+class UniformGenerator() extends DistributionGenerator {
+
+  // XORShiftRandom for better performance. Thread safety isn't necessary 
here.
+  private val random = new XORShiftRandom()
+
+  /**
+   * @return An i.i.d sample as a Double from U[0.0, 1.0].
+   */
+  override def nextValue(): Double = {
+random.nextDouble()
+  }
+
+  /** Set random seed. */
+  override def setSeed(seed: Long) = random.setSeed(seed)
+
+  override def newInstance(): UniformGenerator = new UniformGenerator()
+}
+
+/**
+ * Generates i.i.d. samples from the Standard Normal Distribution.
+ */
+class StandardNormalGenerator() extends DistributionGenerator {
+
+  // XORShiftRandom for better performance. Thread safety isn't necessary 
here.
+  private val random = new XORShiftRandom()
--- End diff --

Is it allowed to use a DistributionGenerator before calling setSeed? It 
would seem simpler to disallow that, but it seems to be something it got from 
trait Pseudorandom.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-22 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15215432
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
+val otherPartitionSize = size / numSlices
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+while (i < numSlices) {
+  partitions(i) =  if (i == 0) {
+new RandomRDDPartition(i, firstPartitionSize, rng, seed)
+  } else {
+new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), 
seed)
+  }
+  i += 1
+}
+partitions.asInstanc

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213097
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
+val otherPartitionSize = size / numSlices
+
+val partitions = new Array[RandomRDDPartition](numSlices)
+var i = 0
+while (i < numSlices) {
+  partitions(i) =  if (i == 0) {
+new RandomRDDPartition(i, firstPartitionSize, rng, seed)
--- End diff --

It is safer to make a copy in `compute` than here. In local mode, this may 
cause problems if a user uses the same generator to create two random R

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213078
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
--- End diff --

Could you double check whether we can have more than `Int.MaxValue` items 
in a single partition? It may break storage and couple RDD functions like 
`glob`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213091
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
--- End diff --

If we couldn't have more than `Int.MaxValue` items per iteration, this is 
`Iterator.fill(numElem)(rng.nextValue())`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213082
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
--- End diff --

You can put `override` directly in the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213096
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala 
---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.mllib.linalg.{DenseVector, Vector}
+import org.apache.spark.mllib.random.DistributionGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+private[mllib] class RandomRDDPartition(val idx: Int,
+val size: Long,
+val rng: DistributionGenerator,
+val seed: Long) extends Partition {
+
+  override val index: Int = idx
+
+}
+
+// These two classes are necessary since Range objects in Scala cannot 
have size > Int.MaxValue
+private[mllib] class RandomRDD(@transient private var sc: SparkContext,
+size: Long,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Double] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getPointIterator(split)
+  }
+
+  override def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] class RandomVectorRDD(@transient private var sc: 
SparkContext,
+size: Long,
+vectorSize: Int,
+numSlices: Int,
+@transient rng: DistributionGenerator,
+@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, 
Nil) {
+
+  require(size > 0, "Positive RDD size required.")
+  require(numSlices > 0, "Positive number of partitions required")
+  require(vectorSize > 0, "Positive vector size required.")
+
+  override def compute(splitIn: Partition, context: TaskContext): 
Iterator[Vector] = {
+val split = splitIn.asInstanceOf[RandomRDDPartition]
+RandomRDD.getVectorIterator(split, vectorSize)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+RandomRDD.getPartitions(size, numSlices, rng, seed)
+  }
+}
+
+private[mllib] object RandomRDD {
+
+  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: 
DistributionGenerator)
+extends Iterator[Double] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Double = {
+  currentSize += 1
+  rng.nextValue()
+}
+  }
+
+  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
+  val vectorSize: Int,
+  val rng: DistributionGenerator)
+extends Iterator[Vector] {
+
+private var currentSize = 0
+
+override def hasNext: Boolean = currentSize < numElem
+
+override def next(): Vector = {
+  currentSize += 1
+  new DenseVector((0 until vectorSize).map { _ => rng.nextValue() 
}.toArray)
+}
+  }
+
+  def getPartitions(size: Long,
+  numSlices: Int,
+  rng: DistributionGenerator,
+  seed: Long): Array[Partition] = {
+
+val firstPartitionSize = size / numSlices + size % numSlices
--- End diff --

This is not evenly distributed, for example, when `size = 1000` and 
`numSlices = 101`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213069
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+// TODO add Scaladocs once API fully approved
+// Alternatively, we can use the generator pattern to set numPartitions, 
seed, etc instead to bring
+// down the number of methods here.
+object RandomRDDGenerators {
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val uniform = new UniformGenerator()
+randomRDD(sc, size, numPartitions, uniform, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
--- End diff --

It is very confusing to have both `(SparkContext, Long, Int)` and 
`(SparkContext, Long, Long)`. If a user doesn't see `(SparkContext, Long, Int)` 
and treat the third argument as the seed and set an integer, it actually sets 
the number of partitions. Maybe we should only allow default values at the end. 
That is,

~~~
def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long)

def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int)
~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213063
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * @return An i.i.d sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * @return A copy of the DistributionGenerator with a new instance of 
the rng object used in the
+   * class when applicable. Each partition has a unique seed and 
therefore requires its
+   * own instance of the DistributionGenerator.
+   */
+  def newInstance(): DistributionGenerator
+}
+
+/**
+ * Generates i.i.d. samples from U[0.0, 1.0]
+ */
+class UniformGenerator() extends DistributionGenerator {
--- End diff --

Is `()` necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213059
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * @return An i.i.d sample as a Double from an underlying distribution.
--- End diff --

Change `@return` to `Returns`. Otherwise the summary will be empty in the 
generated docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213064
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * @return An i.i.d sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * @return A copy of the DistributionGenerator with a new instance of 
the rng object used in the
+   * class when applicable. Each partition has a unique seed and 
therefore requires its
+   * own instance of the DistributionGenerator.
+   */
+  def newInstance(): DistributionGenerator
+}
+
+/**
+ * Generates i.i.d. samples from U[0.0, 1.0]
+ */
+class UniformGenerator() extends DistributionGenerator {
+
+  // XORShiftRandom for better performance. Thread safety isn't necessary 
here.
+  private val random = new XORShiftRandom()
+
+  /**
+   * @return An i.i.d sample as a Double from U[0.0, 1.0].
+   */
+  override def nextValue(): Double = {
+random.nextDouble()
+  }
+
+  /** Set random seed. */
--- End diff --

Doc is not necessary for the overloaded methods, unless you want to update 
it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213072
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
+
+// TODO add Scaladocs once API fully approved
+// Alternatively, we can use the generator pattern to set numPartitions, 
seed, etc instead to bring
+// down the number of methods here.
+object RandomRDDGenerators {
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val uniform = new UniformGenerator()
+randomRDD(sc, size, numPartitions, uniform, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
+uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: 
Long): RDD[Double] = {
+val normal = new StandardNormalGenerator()
+randomRDD(sc, size, numPartitions, normal, seed)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
+normalRDD(sc, size, sc.defaultParallelism, seed)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): 
RDD[Double] = {
+normalRDD(sc, size, numPartitions, Utils.random.nextLong)
+  }
+
+  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
+normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
+  }
+
+  def poissonRDD(sc: SparkContext,
+  size: Long,
+  numPartitions: Int,
+  mean: Double,
+  seed: Long): RDD[Double] = {
+val poisson = new PoissonGenerator(mean)
+randomRDD(sc, size, numPartitions, poisson, seed)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): 
RDD[Double] = {
+poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: 
Double): RDD[Double] = {
+poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
+  }
+
+  def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] 
= {
+poissonRDD(sc, size, sc.defaultParallelism, mean, 
Utils.random.nextLong)
+  }
+
+  def randomRDD(sc: SparkContext,
+  size: Long,
+  numPartitions: Int,
+  distribution: DistributionGenerator,
--- End diff --

We need to be consistent on the argument name. `distribution` is used here 
but `rng` is used in `randomVectorRDD`. `generator` sounds better to me than 
`distribution` because `DistributionGenerator` is a generator but not a 
distribution and in the context we don't need to use `distributionGenerator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213061
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * @return An i.i.d sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * @return A copy of the DistributionGenerator with a new instance of 
the rng object used in the
+   * class when applicable. Each partition has a unique seed and 
therefore requires its
--- End diff --

`partition` has no context here. Maybe simply mention that this is for 
running multiple instances concurrently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1520#discussion_r15213062
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.random
+
+import cern.jet.random.Poisson
+import cern.jet.random.engine.DRand
+
+import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
+
+/**
+ * Trait for random number generators that generate i.i.d values from a 
distribution.
+ */
+trait DistributionGenerator extends Pseudorandom with Serializable {
+
+  /**
+   * @return An i.i.d sample as a Double from an underlying distribution.
+   */
+  def nextValue(): Double
+
+  /**
+   * @return A copy of the DistributionGenerator with a new instance of 
the rng object used in the
+   * class when applicable. Each partition has a unique seed and 
therefore requires its
+   * own instance of the DistributionGenerator.
+   */
+  def newInstance(): DistributionGenerator
--- End diff --

I saw your argument about using `clone`. Between `copy` and `newInstance`, 
I think `copy` is better. For example, in Poisson, we need to copy the mean, 
which is not reflected in `newInstance`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-49700088
  
QA results for PR 1520:- This patch PASSES unit tests.- This patch 
merges cleanly- This patch adds the following public classes 
(experimental):trait DistributionGenerator extends Pseudorandom with 
Serializable {class UniformGenerator() extends DistributionGenerator 
{class StandardNormalGenerator() extends DistributionGenerator {class 
PoissonGenerator(val mean: Double) extends DistributionGenerator {For 
more information see test 
ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16942/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread dorx
Github user dorx commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-49695577
  
@falaki @jkbradley @mengxr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1520#issuecomment-49695427
  
QA tests have started for PR 1520. This patch merges cleanly. View 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16942/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

2014-07-21 Thread dorx
GitHub user dorx opened a pull request:

https://github.com/apache/spark/pull/1520

[SPARK-2514] [mllib] Random RDD generator

Utilities for generating random RDDs.

RandomRDD and RandomVectorRDD are created instead of using 
`sc.parallelize(range:Range)` because `Range` objects in Scala can only have 
`size <= Int.MaxValue`. 

The object `RandomRDDGenerators` can be transformed into a generator class 
to reduce the number of auxiliary methods for optional arguments. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dorx/spark randomRDD

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1520.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1520


commit 888144416ced2b6d4c4839ac95b8a3feb2b3aba1
Author: Doris Xin 
Date:   2014-07-12T01:02:01Z

RandomRDDGenerator: initial design

Looking for feedback on design decisions. Very rough draft and untested.

commit 7cb0e406793db493cee72cb91ec02475c95c8de7
Author: Doris Xin 
Date:   2014-07-12T01:15:56Z

fix for data inconsistency

commit 49ed20d9a30b0ba5d809974bbcf48cc76a45d68e
Author: Doris Xin 
Date:   2014-07-12T01:30:15Z

alternative poisson distribution generator

commit f46d928c4e3e71ced4ede9295ef645fb714c9a69
Author: Doris Xin 
Date:   2014-07-19T02:13:58Z

WIP

commit df5bcffc320bab85f6c5925b244fe9885d6d0eb5
Author: Doris Xin 
Date:   2014-07-21T07:47:07Z

Merge branch 'generator' into randomRDD

commit 92d6f1c3ca0f22371f7f0387b875ac16d5030ffb
Author: Doris Xin 
Date:   2014-07-21T07:48:12Z

solution for Cloneable

commit d56cacbde7a0550f53b59696ad7c7014c827f3f7
Author: Doris Xin 
Date:   2014-07-22T01:23:19Z

impl with RandomRDD

commit bc90234c9639bfb3f4581af63cf4bf370c61e18b
Author: Doris Xin 
Date:   2014-07-22T03:37:40Z

units passed.

commit aec68eb167ac9f11c64d95c698009cbf8919bd4b
Author: Doris Xin 
Date:   2014-07-22T03:42:31Z

newline

commit 063ea0b48b769f7f8477ca2364f8e676f93c297e
Author: Doris Xin 
Date:   2014-07-22T03:43:57Z

Merge branch 'master' into randomRDD




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---