[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/1520 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-50289239 LGTM. Merged into master. Thanks for adding random RDD generators!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-50215914 QA results for PR 1520:- This patch PASSES unit tests.- This patch merges cleanly- This patch adds the following public classes (experimental):trait DistributionGenerator extends Pseudorandom with Serializable {class UniformGenerator extends DistributionGenerator {class StandardNormalGenerator extends DistributionGenerator {class PoissonGenerator(val mean: Double) extends DistributionGenerator {For more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17205/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-50213682 QA tests have started for PR 1520. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17205/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-50213475 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-50208419 QA results for PR 1520:- This patch FAILED unit tests.- This patch merges cleanly- This patch adds the following public classes (experimental):trait DistributionGenerator extends Pseudorandom with Serializable {class UniformGenerator extends DistributionGenerator {class StandardNormalGenerator extends DistributionGenerator {class PoissonGenerator(val mean: Double) extends DistributionGenerator {For more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17197/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-50204159 QA tests have started for PR 1520. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17197/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15423644 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.scalatest.FunSuite + +import org.apache.spark.util.StatCounter + +// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged +class DistributionGeneratorSuite extends FunSuite { + + def apiChecks(gen: DistributionGenerator) { + +// resetting seed should generate the same sequence of random numbers +gen.setSeed(42L) +val array1 = (0 until 1000).map(_ => gen.nextValue()) +gen.setSeed(42L) +val array2 = (0 until 1000).map(_ => gen.nextValue()) +assert(array1.equals(array2)) + +// newInstance should contain a difference instance of the rng +// i.e. setting difference seeds for difference instances produces different sequences of +// random numbers. +val gen2 = gen.copy() +gen.setSeed(0L) +val array3 = (0 until 1000).map(_ => gen.nextValue()) +gen2.setSeed(1L) +val array4 = (0 until 1000).map(_ => gen2.nextValue()) +// Compare arrays instead of elements since individual elements can coincide by chance but the +// sequences should differ given two different seeds. +assert(!array3.equals(array4)) + +// test that setting the same seed in the copied instance produces the same sequence of numbers +gen.setSeed(0L) +val array5 = (0 until 1000).map(_ => gen.nextValue()) +gen2.setSeed(0L) +val array6 = (0 until 1000).map(_ => gen2.nextValue()) +assert(array5.equals(array6)) + } + + def distributionChecks(gen: DistributionGenerator, + mean: Double = 0.0, + stddev: Double = 1.0, + epsilon: Double = 1e-3) { +for (seed <- 0 until 5) { + gen.setSeed(seed.toLong) + val sample = (0 until 1000).map { _ => gen.nextValue()} --- End diff -- Old tests ran for 21 s on my laptop. Shaved off a couple 0s everywhere and now it finishes in 986 milliseconds. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-50120556 @dorx Besides comments, could you mark distribution generators and methods that requires distribution generators `@Experimental`? Part of the reason is that we don't have the API in Python and whether we should implement the same in Python is not clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15390040 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +import scala.util.Random + +private[mllib] class RandomRDDPartition(override val index: Int, +val size: Int, +val generator: DistributionGenerator, +val seed: Long) extends Partition { + // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size + require(size > 0, "Positive partition size required.") +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, --- End diff -- Could it be a `val`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389923 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.scalatest.FunSuite + +import org.apache.spark.util.StatCounter + +// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged +class DistributionGeneratorSuite extends FunSuite { + + def apiChecks(gen: DistributionGenerator) { + +// resetting seed should generate the same sequence of random numbers +gen.setSeed(42L) +val array1 = (0 until 1000).map(_ => gen.nextValue()) +gen.setSeed(42L) +val array2 = (0 until 1000).map(_ => gen.nextValue()) +assert(array1.equals(array2)) + +// newInstance should contain a difference instance of the rng +// i.e. setting difference seeds for difference instances produces different sequences of +// random numbers. +val gen2 = gen.copy() +gen.setSeed(0L) +val array3 = (0 until 1000).map(_ => gen.nextValue()) +gen2.setSeed(1L) +val array4 = (0 until 1000).map(_ => gen2.nextValue()) +// Compare arrays instead of elements since individual elements can coincide by chance but the +// sequences should differ given two different seeds. +assert(!array3.equals(array4)) + +// test that setting the same seed in the copied instance produces the same sequence of numbers +gen.setSeed(0L) +val array5 = (0 until 1000).map(_ => gen.nextValue()) +gen2.setSeed(0L) +val array6 = (0 until 1000).map(_ => gen2.nextValue()) +assert(array5.equals(array6)) + } + + def distributionChecks(gen: DistributionGenerator, + mean: Double = 0.0, + stddev: Double = 1.0, + epsilon: Double = 1e-3) { +for (seed <- 0 until 5) { + gen.setSeed(seed.toLong) + val sample = (0 until 1000).map { _ => gen.nextValue()} --- End diff -- Just curious, how long does the entire test suite take to finish? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389784 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD} +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.util.StatCounter + +/** + * Note: avoid including APIs that do not set the seed for the RNG in unit tests + * in order to guarantee deterministic behavior. + * + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged + */ +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable { + + def testGeneratedRDD(rdd: RDD[Double], + expectedSize: Long, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { +val stats = rdd.stats() +assert(expectedSize === stats.count) +assert(expectedNumPartitions === rdd.partitions.size) +assert(math.abs(stats.mean - expectedMean) < epsilon) +assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + // assume test RDDs are small + def testGeneratedVectorRDD(rdd: RDD[Vector], + expectedRows: Long, + expectedColumns: Int, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { +assert(expectedNumPartitions === rdd.partitions.size) +val values = new ArrayBuffer[Double]() +rdd.collect.foreach { vector => { + assert(vector.size === expectedColumns) + values ++= vector.toArray +}} +assert(expectedRows === values.size / expectedColumns) +val stats = new StatCounter(values) +assert(math.abs(stats.mean - expectedMean) < epsilon) +assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + test("RandomRDD sizes") { + +// some cases where size % numParts != 0 to test getPartitions behaves correctly +for ((size, numPartitions) <- List((1, 6), (12345, 1), (1000, 101))) { + val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) + assert(rdd.count() === size) + assert(rdd.partitions.size === numPartitions) + + // check that partition sizes are balanced + val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble) + val partStats = new StatCounter(partSizes) + assert(partStats.stdev < 1.0) +} + +// size > Int.MaxValue +val size = Int.MaxValue.toLong * 100L +val numPartitions = 101 +val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) +assert(rdd.partitions.size === numPartitions) +val count = rdd.partitions.foldLeft(0L){ + (count, part) => count + part.asInstanceOf[RandomRDDPartition].size +} +assert(count === size) + +// size needs to be positive +try { + new RandomRDD(sc, 0, 10, new UniformGenerator, 0L) --- End diff -- You can use `Intercept[IllegalArgumentException]{ ... }`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389761 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD} +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.util.StatCounter + +/** + * Note: avoid including APIs that do not set the seed for the RNG in unit tests + * in order to guarantee deterministic behavior. + * + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged + */ +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable { + + def testGeneratedRDD(rdd: RDD[Double], + expectedSize: Long, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { +val stats = rdd.stats() +assert(expectedSize === stats.count) +assert(expectedNumPartitions === rdd.partitions.size) +assert(math.abs(stats.mean - expectedMean) < epsilon) +assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + // assume test RDDs are small + def testGeneratedVectorRDD(rdd: RDD[Vector], + expectedRows: Long, + expectedColumns: Int, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { +assert(expectedNumPartitions === rdd.partitions.size) +val values = new ArrayBuffer[Double]() +rdd.collect.foreach { vector => { + assert(vector.size === expectedColumns) + values ++= vector.toArray +}} +assert(expectedRows === values.size / expectedColumns) +val stats = new StatCounter(values) +assert(math.abs(stats.mean - expectedMean) < epsilon) +assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + test("RandomRDD sizes") { + +// some cases where size % numParts != 0 to test getPartitions behaves correctly +for ((size, numPartitions) <- List((1, 6), (12345, 1), (1000, 101))) { + val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) + assert(rdd.count() === size) + assert(rdd.partitions.size === numPartitions) + + // check that partition sizes are balanced + val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble) + val partStats = new StatCounter(partSizes) + assert(partStats.stdev < 1.0) +} + +// size > Int.MaxValue +val size = Int.MaxValue.toLong * 100L +val numPartitions = 101 +val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) +assert(rdd.partitions.size === numPartitions) +val count = rdd.partitions.foldLeft(0L){ + (count, part) => count + part.asInstanceOf[RandomRDDPartition].size --- End diff -- Move `(count, part) =>` to the line above and insert a space between `){`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389694 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD} +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.util.StatCounter + +/** + * Note: avoid including APIs that do not set the seed for the RNG in unit tests + * in order to guarantee deterministic behavior. + * + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged + */ +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable { + + def testGeneratedRDD(rdd: RDD[Double], + expectedSize: Long, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { +val stats = rdd.stats() +assert(expectedSize === stats.count) +assert(expectedNumPartitions === rdd.partitions.size) +assert(math.abs(stats.mean - expectedMean) < epsilon) +assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + // assume test RDDs are small + def testGeneratedVectorRDD(rdd: RDD[Vector], + expectedRows: Long, + expectedColumns: Int, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { +assert(expectedNumPartitions === rdd.partitions.size) +val values = new ArrayBuffer[Double]() +rdd.collect.foreach { vector => { + assert(vector.size === expectedColumns) + values ++= vector.toArray +}} +assert(expectedRows === values.size / expectedColumns) +val stats = new StatCounter(values) +assert(math.abs(stats.mean - expectedMean) < epsilon) +assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + test("RandomRDD sizes") { + +// some cases where size % numParts != 0 to test getPartitions behaves correctly +for ((size, numPartitions) <- List((1, 6), (12345, 1), (1000, 101))) { + val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) + assert(rdd.count() === size) + assert(rdd.partitions.size === numPartitions) + + // check that partition sizes are balanced + val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble) + val partStats = new StatCounter(partSizes) + assert(partStats.stdev < 1.0) --- End diff -- Why checking the stdev of partition sizes? It should be `maxPartitionSize - minPartitionSize <= 1`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389645 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +import scala.util.Random + +private[mllib] class RandomRDDPartition(override val index: Int, +val size: Int, +val generator: DistributionGenerator, +val seed: Long) extends Partition { + // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size + require(size > 0, "Positive partition size required.") +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, --- End diff -- `numSlices` -> `numPartitions` to match the naming in previous code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389677 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD} +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.util.StatCounter + +/** + * Note: avoid including APIs that do not set the seed for the RNG in unit tests + * in order to guarantee deterministic behavior. + * + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged + */ --- End diff -- Those comments are not JavaDoc. Could you move it inside the class closure and change the first line from `/**` to `/*`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389671 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.scalatest.FunSuite + +import org.apache.spark.util.StatCounter + +// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged +class DistributionGeneratorSuite extends FunSuite { + + def apiChecks(gen: DistributionGenerator) { + +// resetting seed should generate the same sequence of random numbers +gen.setSeed(42L) +val array1 = (0 until 1000).map(_ => gen.nextValue()) +gen.setSeed(42L) +val array2 = (0 until 1000).map(_ => gen.nextValue()) +assert(array1.equals(array2)) + +// newInstance should contain a difference instance of the rng +// i.e. setting difference seeds for difference instances produces different sequences of +// random numbers. +val gen2 = gen.copy() +gen.setSeed(0L) +val array3 = (0 until 1000).map(_ => gen.nextValue()) +gen2.setSeed(1L) +val array4 = (0 until 1000).map(_ => gen2.nextValue()) +// Compare arrays instead of elements since individual elements can coincide by chance but the +// sequences should differ given two different seeds. +assert(!array3.equals(array4)) + +// test that setting the same seed in the copied instance produces the same sequence of numbers +gen.setSeed(0L) +val array5 = (0 until 1000).map(_ => gen.nextValue()) +gen2.setSeed(0L) +val array6 = (0 until 1000).map(_ => gen2.nextValue()) +assert(array5.equals(array6)) + } + + def distributionChecks(gen: DistributionGenerator, + mean: Double = 0.0, + stddev: Double = 1.0, + epsilon: Double = 1e-3) { +for (seed <- 0 until 5) { + gen.setSeed(seed.toLong) + val sample = (0 until 1000).map { _ => gen.nextValue()} + val stats = new StatCounter(sample) + assert(math.abs(stats.mean - mean) < epsilon) + assert(math.abs(stats.stdev - stddev) < epsilon) +} + } + + test("UniformGenerator") { +val uniform = new UniformGenerator() +apiChecks(uniform) +// Stddev of uniform distribution = (ub - lb) / math.sqrt(12) +distributionChecks(uniform, 0.5, 1 / math.sqrt(12)) + } + + test("StandardNormalGenerator") { +val normal = new StandardNormalGenerator() +apiChecks(normal) +distributionChecks(normal, 0.0, 1.0) + } + + test("PoissonGenerator") { +// mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced. +for (mean <- List(1.0, 5.0, 100.0)) { + val poisson = new PoissonGenerator(mean) + apiChecks(poisson) + distributionChecks(poisson, mean, math.sqrt(mean), 1e-2) +} + } +} + --- End diff -- remove extra empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389681 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD} +import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.util.StatCounter + +/** + * Note: avoid including APIs that do not set the seed for the RNG in unit tests + * in order to guarantee deterministic behavior. + * + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged + */ +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable { + + def testGeneratedRDD(rdd: RDD[Double], + expectedSize: Long, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { +val stats = rdd.stats() +assert(expectedSize === stats.count) +assert(expectedNumPartitions === rdd.partitions.size) +assert(math.abs(stats.mean - expectedMean) < epsilon) +assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + // assume test RDDs are small + def testGeneratedVectorRDD(rdd: RDD[Vector], + expectedRows: Long, + expectedColumns: Int, + expectedNumPartitions: Int, + expectedMean: Double, + expectedStddev: Double, + epsilon: Double = 0.01) { +assert(expectedNumPartitions === rdd.partitions.size) +val values = new ArrayBuffer[Double]() +rdd.collect.foreach { vector => { + assert(vector.size === expectedColumns) + values ++= vector.toArray +}} +assert(expectedRows === values.size / expectedColumns) +val stats = new StatCounter(values) +assert(math.abs(stats.mean - expectedMean) < epsilon) +assert(math.abs(stats.stdev - expectedStddev) < epsilon) + } + + test("RandomRDD sizes") { + +// some cases where size % numParts != 0 to test getPartitions behaves correctly +for ((size, numPartitions) <- List((1, 6), (12345, 1), (1000, 101))) { + val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L) + assert(rdd.count() === size) + assert(rdd.partitions.size === numPartitions) + + // check that partition sizes are balanced + val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble) --- End diff -- `( ` -> `(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389665 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +import scala.util.Random + +private[mllib] class RandomRDDPartition(override val index: Int, +val size: Int, +val generator: DistributionGenerator, +val seed: Long) extends Partition { + // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size + require(size > 0, "Positive partition size required.") +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue, +"Partition size cannot exceed Int.MaxValue") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue, +"Partition size cannot exceed Int.MaxValue") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +var start: Long = 0 +var end: Long = 0 +val random = new Random(seed) +while (i < numSlices) { + end = ((i + 1) * size) / numSlices + partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, random.nextLong()) + start = end + i += 1 +} +partitions.asInstanceOf[Array[Partition]] + } + + // The RNG has to be reset every time the iterator is requested to guarantee same data + // every time the content of the RDD is examined. + def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = { +val generator = partition.generator.copy() +generator.setSeed(partition.seed) +Iterator.fill(partition.size)(generator.nextValue()) + } + + // The RNG has to be reset every time the iterator is requested to guarantee same data + // every
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389640 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +import scala.util.Random + +private[mllib] class RandomRDDPartition(override val index: Int, +val size: Int, +val generator: DistributionGenerator, +val seed: Long) extends Partition { + // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size + require(size > 0, "Positive partition size required.") --- End diff -- If `numPartiitons` > `size`, there would be empty partitions. We should allow this case, because it happens when a user uses the default number of partitions, which may be greater than `size`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389629 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +import scala.util.Random + +private[mllib] class RandomRDDPartition(override val index: Int, +val size: Int, +val generator: DistributionGenerator, +val seed: Long) extends Partition { + // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size --- End diff -- The comment needs update because it is possible to cast a Long to a positive Int: ~~~ scala> 1000L.toInt res0: Int = 1215752192 ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389602 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * Returns an i.i.d sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * Returns a copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable for non-locking concurrent usage. + */ + def copy(): DistributionGenerator +} + +/** + * Generates i.i.d. samples from U[0.0, 1.0] + */ +class UniformGenerator extends DistributionGenerator { + + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() + + override def nextValue(): Double = { +random.nextDouble() + } + + override def setSeed(seed: Long) = random.setSeed(seed) + + override def copy(): UniformGenerator = new UniformGenerator() +} + +/** + * Generates i.i.d. samples from the Standard Normal Distribution. --- End diff -- `Standard Normal Distribution` -> `standard normal distribution` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389618 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +/** + * Generator methods for creating RDDs comprised of i.i.d samples from some distribution. + * + * TODO Generate RDD[Vector] from multivariate distributions. + */ +object RandomRDDGenerators { + + /** + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val uniform = new UniformGenerator() +randomRDD(sc, uniform, size, numPartitions, seed) + } + + /** + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + /** + * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0]. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0]. + */ + def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + /** + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @param seed Seed for the RNG that generates the seed for the generator in each partition. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + */ + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val normal = new StandardNormalGenerator() +randomRDD(sc, normal, size, numPartitions, seed) + } + + /** + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0). + */ + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +normalRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + /** + * Generates an RDD comprised of i.i.d samples from the standard normal distribution. + * sc.defaultParallelism used for the number of partitions in the RDD. + * + * @param sc SparkContext used to create the RDD. + * @param size Size of the RDD. + * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389612 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala --- @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +/** + * Generator methods for creating RDDs comprised of i.i.d samples from some distribution. + * + * TODO Generate RDD[Vector] from multivariate distributions. --- End diff -- Move `TODO: ...` inside code block. Otherwise, it becomes part of the documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15389599 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. --- End diff -- `i.i.d` -> `i.i.d.` and in other places --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-49946571 QA results for PR 1520:- This patch PASSES unit tests.- This patch merges cleanly- This patch adds the following public classes (experimental):trait DistributionGenerator extends Pseudorandom with Serializable {class UniformGenerator extends DistributionGenerator {class StandardNormalGenerator extends DistributionGenerator {class PoissonGenerator(val mean: Double) extends DistributionGenerator {For more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17060/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-49935146 QA tests have started for PR 1520. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17060/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15307029 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +// TODO add Scaladocs once API fully approved +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring +// down the number of methods here. +object RandomRDDGenerators { + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val uniform = new UniformGenerator() +randomRDD(sc, size, numPartitions, uniform, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val normal = new StandardNormalGenerator() +randomRDD(sc, size, numPartitions, normal, seed) + } + + def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +normalRDD(sc, size, sc.defaultParallelism, seed) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +normalRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { +normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + mean: Double, + seed: Long): RDD[Double] = { +val poisson = new PoissonGenerator(mean) +randomRDD(sc, size, numPartitions, poisson, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = { +poissonRDD(sc, size, sc.defaultParallelism, mean, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = { +poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = { +poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong) + } + + def randomRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + distribution: DistributionGenerator, --- End diff -- Hmm let's go with `generator` to be clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15305960 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices +val otherPartitionSize = size / numSlices + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +while (i < numSlices) { + partitions(i) = if (i == 0) { +new RandomRDDPartition(i, firstPartitionSize, rng, seed) + } else { +new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed) + } + i += 1 +} +partitions.asInstanceO
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15275118 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices +val otherPartitionSize = size / numSlices + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +while (i < numSlices) { + partitions(i) = if (i == 0) { +new RandomRDDPartition(i, firstPartitionSize, rng, seed) + } else { +new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed) + } + i += 1 +} +partitions.asInstanc
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15275100 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices +val otherPartitionSize = size / numSlices + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +while (i < numSlices) { + partitions(i) = if (i == 0) { +new RandomRDDPartition(i, firstPartitionSize, rng, seed) + } else { +new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed) + } + i += 1 +} +partitions.asInstanc
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15271948 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +// TODO add Scaladocs once API fully approved +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring +// down the number of methods here. +object RandomRDDGenerators { + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val uniform = new UniformGenerator() +randomRDD(sc, size, numPartitions, uniform, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val normal = new StandardNormalGenerator() +randomRDD(sc, size, numPartitions, normal, seed) + } + + def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +normalRDD(sc, size, sc.defaultParallelism, seed) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +normalRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { +normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + mean: Double, + seed: Long): RDD[Double] = { +val poisson = new PoissonGenerator(mean) +randomRDD(sc, size, numPartitions, poisson, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = { +poissonRDD(sc, size, sc.defaultParallelism, mean, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = { +poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = { +poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong) + } + + def randomRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + distribution: DistributionGenerator, --- End diff -- Which do you prefer, `rng` or `generator`? `rng` feels like an instance of `RandomNumberGenerator`. As we discussed, `DistributionGenerator` is different from `RandomNumberGenerator`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15271907 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, --- End diff -- So let us not put more than `Int.MaxValue` items to a single partition. If the size is beyond this limit, we should throw an error. Different from `collect`, `glom` converts `RDD[T]` to `RDD[Array[T]]`, coalescing all elements within each partition into an array. `glom` is not frequently used but we should support `cache`, which means we shouldn't allow more than `Int.MaxValue` number of items on a single partition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15271812 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices +val otherPartitionSize = size / numSlices + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +while (i < numSlices) { + partitions(i) = if (i == 0) { +new RandomRDDPartition(i, firstPartitionSize, rng, seed) --- End diff -- It feels more secure if people see the following lines in the same group. ~~~ val thisGenerator = generator.copy() thisGenerator.s
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15266943 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +// TODO add Scaladocs once API fully approved +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring +// down the number of methods here. +object RandomRDDGenerators { + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val uniform = new UniformGenerator() +randomRDD(sc, size, numPartitions, uniform, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val normal = new StandardNormalGenerator() +randomRDD(sc, size, numPartitions, normal, seed) + } + + def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +normalRDD(sc, size, sc.defaultParallelism, seed) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +normalRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { +normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + mean: Double, + seed: Long): RDD[Double] = { +val poisson = new PoissonGenerator(mean) +randomRDD(sc, size, numPartitions, poisson, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = { +poissonRDD(sc, size, sc.defaultParallelism, mean, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = { +poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = { +poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong) + } + + def randomRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + distribution: DistributionGenerator, --- End diff -- copypasta fail. meant to change all of them to `rng`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15266470 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices --- End diff -- Switching to the slicing logic from `ParallelCollectionRDD` for more balanced partition sizes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15265929 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices +val otherPartitionSize = size / numSlices + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +while (i < numSlices) { + partitions(i) = if (i == 0) { +new RandomRDDPartition(i, firstPartitionSize, rng, seed) --- End diff -- Or I could just add `.newInstance()' in line 118, which I prefer since it's not necessary for us to have a new rng every time the RDD is computed. B
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15265778 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices +val otherPartitionSize = size / numSlices + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +while (i < numSlices) { + partitions(i) = if (i == 0) { +new RandomRDDPartition(i, firstPartitionSize, rng, seed) + } else { +new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed) + } + i += 1 +} +partitions.asInstanceO
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15265484 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * @return A copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable. Each partition has a unique seed and therefore requires its + * own instance of the DistributionGenerator. + */ + def newInstance(): DistributionGenerator +} + +/** + * Generates i.i.d. samples from U[0.0, 1.0] + */ +class UniformGenerator() extends DistributionGenerator { + + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() + + /** + * @return An i.i.d sample as a Double from U[0.0, 1.0]. + */ + override def nextValue(): Double = { +random.nextDouble() + } + + /** Set random seed. */ + override def setSeed(seed: Long) = random.setSeed(seed) + + override def newInstance(): UniformGenerator = new UniformGenerator() +} + +/** + * Generates i.i.d. samples from the Standard Normal Distribution. + */ +class StandardNormalGenerator() extends DistributionGenerator { + + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() --- End diff -- As with most random objects, the DistributionGenerator should be created with a default seed (so using it before calling `setSeed` is legal). I like how in Colt it's called `reseed` instead, but `setSeed` is also widely adopted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15265297 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, --- End diff -- There's no restrictions on the size of each Partition from the trait (in fact it doesn't even need to have a size). The restriction of `size <= Int.MaxValue` happens when the RDD is cached. In the case of `glom`, the size of the entire RDD needs to be `<= Int.MaxValue` since that's the max size of an array. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15262023 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * @return A copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable. Each partition has a unique seed and therefore requires its + * own instance of the DistributionGenerator. + */ + def newInstance(): DistributionGenerator --- End diff -- So ideally `newInstance` would be an abstract static method inside the DistributionGenerator class that takes an instance as an argument to better express the fact we're copying the class members in the new instance, but since abstract static methods in interfaces aren't really supported in Scala (a combination of trait and object here will be messy for users to implement), copy will do nicely here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user falaki commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15246377 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +// TODO add Scaladocs once API fully approved +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring +// down the number of methods here. +object RandomRDDGenerators { + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val uniform = new UniformGenerator() +randomRDD(sc, size, numPartitions, uniform, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { --- End diff -- Alternatively we can switch to a generator model. Doris suggested it in her TODO. What do you guys think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15215522 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * @return A copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable. Each partition has a unique seed and therefore requires its + * own instance of the DistributionGenerator. + */ + def newInstance(): DistributionGenerator +} + +/** + * Generates i.i.d. samples from U[0.0, 1.0] + */ +class UniformGenerator() extends DistributionGenerator { + + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() + + /** + * @return An i.i.d sample as a Double from U[0.0, 1.0]. + */ + override def nextValue(): Double = { +random.nextDouble() + } + + /** Set random seed. */ + override def setSeed(seed: Long) = random.setSeed(seed) + + override def newInstance(): UniformGenerator = new UniformGenerator() +} + +/** + * Generates i.i.d. samples from the Standard Normal Distribution. + */ +class StandardNormalGenerator() extends DistributionGenerator { + + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() --- End diff -- Is it allowed to use a DistributionGenerator before calling setSeed? It would seem simpler to disallow that, but it seems to be something it got from trait Pseudorandom. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15215432 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices +val otherPartitionSize = size / numSlices + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +while (i < numSlices) { + partitions(i) = if (i == 0) { +new RandomRDDPartition(i, firstPartitionSize, rng, seed) + } else { +new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed) + } + i += 1 +} +partitions.asInstanc
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213097 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices +val otherPartitionSize = size / numSlices + +val partitions = new Array[RandomRDDPartition](numSlices) +var i = 0 +while (i < numSlices) { + partitions(i) = if (i == 0) { +new RandomRDDPartition(i, firstPartitionSize, rng, seed) --- End diff -- It is safer to make a copy in `compute` than here. In local mode, this may cause problems if a user uses the same generator to create two random R
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213078 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, --- End diff -- Could you double check whether we can have more than `Int.MaxValue` items in a single partition? It may break storage and couple RDD functions like `glob`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213091 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) --- End diff -- If we couldn't have more than `Int.MaxValue` items per iteration, this is `Iterator.fill(numElem)(rng.nextValue())`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213082 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx --- End diff -- You can put `override` directly in the constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213096 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.mllib.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.random.DistributionGenerator +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +private[mllib] class RandomRDDPartition(val idx: Int, +val size: Long, +val rng: DistributionGenerator, +val seed: Long) extends Partition { + + override val index: Int = idx + +} + +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue +private[mllib] class RandomRDD(@transient private var sc: SparkContext, +size: Long, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getPointIterator(split) + } + + override def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext, +size: Long, +vectorSize: Int, +numSlices: Int, +@transient rng: DistributionGenerator, +@transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) { + + require(size > 0, "Positive RDD size required.") + require(numSlices > 0, "Positive number of partitions required") + require(vectorSize > 0, "Positive vector size required.") + + override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = { +val split = splitIn.asInstanceOf[RandomRDDPartition] +RandomRDD.getVectorIterator(split, vectorSize) + } + + override protected def getPartitions: Array[Partition] = { +RandomRDD.getPartitions(size, numSlices, rng, seed) + } +} + +private[mllib] object RandomRDD { + + private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator) +extends Iterator[Double] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Double = { + currentSize += 1 + rng.nextValue() +} + } + + private[mllib] class FixedSizeVectorIterator(val numElem: Long, + val vectorSize: Int, + val rng: DistributionGenerator) +extends Iterator[Vector] { + +private var currentSize = 0 + +override def hasNext: Boolean = currentSize < numElem + +override def next(): Vector = { + currentSize += 1 + new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray) +} + } + + def getPartitions(size: Long, + numSlices: Int, + rng: DistributionGenerator, + seed: Long): Array[Partition] = { + +val firstPartitionSize = size / numSlices + size % numSlices --- End diff -- This is not evenly distributed, for example, when `size = 1000` and `numSlices = 101`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213069 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +// TODO add Scaladocs once API fully approved +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring +// down the number of methods here. +object RandomRDDGenerators { + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val uniform = new UniformGenerator() +randomRDD(sc, size, numPartitions, uniform, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { --- End diff -- It is very confusing to have both `(SparkContext, Long, Int)` and `(SparkContext, Long, Long)`. If a user doesn't see `(SparkContext, Long, Int)` and treat the third argument as the seed and set an integer, it actually sets the number of partitions. Maybe we should only allow default values at the end. That is, ~~~ def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long) def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int) ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213063 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * @return A copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable. Each partition has a unique seed and therefore requires its + * own instance of the DistributionGenerator. + */ + def newInstance(): DistributionGenerator +} + +/** + * Generates i.i.d. samples from U[0.0, 1.0] + */ +class UniformGenerator() extends DistributionGenerator { --- End diff -- Is `()` necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213059 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution. --- End diff -- Change `@return` to `Returns`. Otherwise the summary will be empty in the generated docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213064 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * @return A copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable. Each partition has a unique seed and therefore requires its + * own instance of the DistributionGenerator. + */ + def newInstance(): DistributionGenerator +} + +/** + * Generates i.i.d. samples from U[0.0, 1.0] + */ +class UniformGenerator() extends DistributionGenerator { + + // XORShiftRandom for better performance. Thread safety isn't necessary here. + private val random = new XORShiftRandom() + + /** + * @return An i.i.d sample as a Double from U[0.0, 1.0]. + */ + override def nextValue(): Double = { +random.nextDouble() + } + + /** Set random seed. */ --- End diff -- Doc is not necessary for the overloaded methods, unless you want to update it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213072 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils + +// TODO add Scaladocs once API fully approved +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring +// down the number of methods here. +object RandomRDDGenerators { + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val uniform = new UniformGenerator() +randomRDD(sc, size, numPartitions, uniform, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, seed) + } + + def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +uniformRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = { +uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = { +val normal = new StandardNormalGenerator() +randomRDD(sc, size, numPartitions, normal, seed) + } + + def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = { +normalRDD(sc, size, sc.defaultParallelism, seed) + } + + def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = { +normalRDD(sc, size, numPartitions, Utils.random.nextLong) + } + + def normalRDD(sc: SparkContext, size: Long): RDD[Double] = { +normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + mean: Double, + seed: Long): RDD[Double] = { +val poisson = new PoissonGenerator(mean) +randomRDD(sc, size, numPartitions, poisson, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = { +poissonRDD(sc, size, sc.defaultParallelism, mean, seed) + } + + def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = { +poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong) + } + + def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = { +poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong) + } + + def randomRDD(sc: SparkContext, + size: Long, + numPartitions: Int, + distribution: DistributionGenerator, --- End diff -- We need to be consistent on the argument name. `distribution` is used here but `rng` is used in `randomVectorRDD`. `generator` sounds better to me than `distribution` because `DistributionGenerator` is a generator but not a distribution and in the context we don't need to use `distributionGenerator`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213061 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * @return A copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable. Each partition has a unique seed and therefore requires its --- End diff -- `partition` has no context here. Maybe simply mention that this is for running multiple instances concurrently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1520#discussion_r15213062 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} + +/** + * Trait for random number generators that generate i.i.d values from a distribution. + */ +trait DistributionGenerator extends Pseudorandom with Serializable { + + /** + * @return An i.i.d sample as a Double from an underlying distribution. + */ + def nextValue(): Double + + /** + * @return A copy of the DistributionGenerator with a new instance of the rng object used in the + * class when applicable. Each partition has a unique seed and therefore requires its + * own instance of the DistributionGenerator. + */ + def newInstance(): DistributionGenerator --- End diff -- I saw your argument about using `clone`. Between `copy` and `newInstance`, I think `copy` is better. For example, in Poisson, we need to copy the mean, which is not reflected in `newInstance`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-49700088 QA results for PR 1520:- This patch PASSES unit tests.- This patch merges cleanly- This patch adds the following public classes (experimental):trait DistributionGenerator extends Pseudorandom with Serializable {class UniformGenerator() extends DistributionGenerator {class StandardNormalGenerator() extends DistributionGenerator {class PoissonGenerator(val mean: Double) extends DistributionGenerator {For more information see test ouptut:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16942/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user dorx commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-49695577 @falaki @jkbradley @mengxr --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1520#issuecomment-49695427 QA tests have started for PR 1520. This patch merges cleanly. View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16942/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator
GitHub user dorx opened a pull request: https://github.com/apache/spark/pull/1520 [SPARK-2514] [mllib] Random RDD generator Utilities for generating random RDDs. RandomRDD and RandomVectorRDD are created instead of using `sc.parallelize(range:Range)` because `Range` objects in Scala can only have `size <= Int.MaxValue`. The object `RandomRDDGenerators` can be transformed into a generator class to reduce the number of auxiliary methods for optional arguments. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dorx/spark randomRDD Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1520.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1520 commit 888144416ced2b6d4c4839ac95b8a3feb2b3aba1 Author: Doris Xin Date: 2014-07-12T01:02:01Z RandomRDDGenerator: initial design Looking for feedback on design decisions. Very rough draft and untested. commit 7cb0e406793db493cee72cb91ec02475c95c8de7 Author: Doris Xin Date: 2014-07-12T01:15:56Z fix for data inconsistency commit 49ed20d9a30b0ba5d809974bbcf48cc76a45d68e Author: Doris Xin Date: 2014-07-12T01:30:15Z alternative poisson distribution generator commit f46d928c4e3e71ced4ede9295ef645fb714c9a69 Author: Doris Xin Date: 2014-07-19T02:13:58Z WIP commit df5bcffc320bab85f6c5925b244fe9885d6d0eb5 Author: Doris Xin Date: 2014-07-21T07:47:07Z Merge branch 'generator' into randomRDD commit 92d6f1c3ca0f22371f7f0387b875ac16d5030ffb Author: Doris Xin Date: 2014-07-21T07:48:12Z solution for Cloneable commit d56cacbde7a0550f53b59696ad7c7014c827f3f7 Author: Doris Xin Date: 2014-07-22T01:23:19Z impl with RandomRDD commit bc90234c9639bfb3f4581af63cf4bf370c61e18b Author: Doris Xin Date: 2014-07-22T03:37:40Z units passed. commit aec68eb167ac9f11c64d95c698009cbf8919bd4b Author: Doris Xin Date: 2014-07-22T03:42:31Z newline commit 063ea0b48b769f7f8477ca2364f8e676f93c297e Author: Doris Xin Date: 2014-07-22T03:43:57Z Merge branch 'master' into randomRDD --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---