[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2455 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61220683 LGTM. Merged into master. Thanks for implementing gap sampling! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61199386 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22576/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61199382 [Test build #22576 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22576/consoleFull) for PR 2455 at commit [`72496bc`](https://github.com/apache/spark/commit/72496bc93d5da0465f2c50255f281741d6d47bec). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61192204 [Test build #22576 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22576/consoleFull) for PR 2455 at commit [`72496bc`](https://github.com/apache/spark/commit/72496bc93d5da0465f2c50255f281741d6d47bec). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61191942 @mengxr, I changed `fractionEpsilon` to `rngEpsilon`, which is more suggestive of its purpose. I also updated its documentation, which I think is also now more clear about what `rngEpsilon` is for. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19638500 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { -rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) +rngGap.setSeed(seed) } override de
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19638435 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { -rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) +rngGap.setSeed(seed) } override de
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19636555 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -38,13 +41,45 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers. */ + def newDefaultRNG: Random = new XORShiftRandom + + /** + * Default gap sampling maximum. + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" Bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. --- End diff -- 0.5 is what I recommend as an initial guess if one is using a new RNG. (0.4 is what I got by experimenting with the current RNG) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629166 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { --- End diff -- `<= fractionEpsilon`? See my comment for `PoissonSampler`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629120 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { -rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) +rngGap.setSeed(seed) } override def sampl
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629128 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { -rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) +rngGap.setSeed(seed) } override def sampl
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629116 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { -rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) +rngGap.setSeed(seed) } override def sampl
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629129 --- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java --- @@ -140,10 +140,9 @@ public void intersection() { public void sample() { List ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD rdd = sc.parallelize(ints); -JavaRDD sample20 = rdd.sample(true, 0.2, 11); -// expected 2 but of course result varies randomly a bit -Assert.assertEquals(1, sample20.count()); -JavaRDD sample20NoReplacement = rdd.sample(false, 0.2, 11); +JavaRDD sample20 = rdd.sample(true, 0.2, 3); +Assert.assertEquals(2, sample20.count()); +JavaRDD sample20NoReplacement = rdd.sample(false, 0.2, 5); --- End diff -- `NoReplacement` -> `WithoutReplacement` or `WR` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629117 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { -rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) +rngGap.setSeed(seed) } override def sampl
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629123 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { -rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) +rngGap.setSeed(seed) } override def sampl
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629019 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -38,13 +41,45 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers. */ + def newDefaultRNG: Random = new XORShiftRandom + + /** + * Default gap sampling maximum. + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" Bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. --- End diff -- `0.5` -> `0.4`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629028 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) --- End diff -- `(rng.nextDouble() <= fraction)` -> `rng.nextDouble() <= fraction` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629030 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliCellSampler[T] = +new BernoulliCellSampler[T](lb, ub, !complement) + + override def clone = new BernoulliCellSampler[T](lb, ub, complement) +} + + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon) + && fraction <= (1.0 + RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be on interval [0, 1]") - override def clone = new BernoulliSampler[T](lb, ub, complement) + private val rng: Random = RandomSampler.newDefaultRNG + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.fractionEpsilon) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** Epsilon slop to avoid failure from floating point jitter. */ + require( +fraction >= (0.0 - RandomSampler.roundingEpsilon), +s"Sampling fraction ($fraction) must be >= 0") - private[random] var rng = new PoissonDistribution(mean) + // PoissonDistribution throws an exception when fraction <= 0 + // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value. + private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0) + private val rngGap = RandomSampler.newDefaultRNG override def setSeed(seed: Long) { -rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) +rngGap.setSeed(seed) } override def sampl
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19629022 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter. */ + require( +lb <= (ub + RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be <= upper bound ($ub)") + require( +lb >= (0.0 - RandomSampler.roundingEpsilon), +s"Lower bound ($lb) must be >= 0.0") + require( +ub <= (1.0 + RandomSampler.roundingEpsilon), +s"Upper bound ($ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { --- End diff -- This is the main style issue with this update. In Spark, we use the following style: ~~~ items.filter { item => ... } ~~~ Please update other places as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61031984 @mengxr latest updates are rebased and passing Jenkins --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61014891 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22475/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61014883 [Test build #22475 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22475/consoleFull) for PR 2455 at commit [`46cb9fa`](https://github.com/apache/spark/commit/46cb9fa45182e4e194dada91af7e82e9a4c5fb76). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false)` * `class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] ` * `class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] ` * `class GapSamplingIterator[T: ClassTag](var data: Iterator[T], f: Double,` * `class GapSamplingReplacementIterator[T: ClassTag](var data: Iterator[T], f: Double,` * ` class DeferredObjectAdapter(oi: ObjectInspector) extends DeferredObject ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-61005185 [Test build #22475 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22475/consoleFull) for PR 2455 at commit [`46cb9fa`](https://github.com/apache/spark/commit/46cb9fa45182e4e194dada91af7e82e9a4c5fb76). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-60978274 Was about to push, but looks like commit for SPARK-4022 broke my updates so I'm going to have to make more edits to rebsae --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19509394 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + It
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19509239 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + It
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19509062 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + It
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-60793728 @erikerlandson Great! Thanks for the heads up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-60793304 @mengxr, coincidentally I'm working through the PR comments today, I plan to have an update pushed this evening --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-60789726 @erikerlandson The feature freeze deadline for v1.2 is this Sat. Just want to check with you and see whether you are going to update the PR this week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r19047174 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) --- End diff -- Sounds good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18791243 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) --- End diff -- My opinion on naming is still that `BernoulliSampler` ought to refer to the object that does straight Bernoulli sampling (without added data partitioning semantics). It seems misleading to have an object called `BernoulliSampler` that introduces additional class parameters and extra computation under the hood to support a specialized use case. Naming that class `BernoulliCellSampler` captures the idea that it is the variation with a specialized purpose. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18790724 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) --- End diff -- I agree, "Partition" is overloaded in Spark, although it's equally correct in both contexts. Maybe "Cell" is a decent alternative: http://en.wikipedia.org/wiki/Partition_of_a_set So it would be `BernoulliCellSampler` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18537414 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) --- End diff -- `Partition` is slightly confusing in the Spark context, but I don't have good suggestions. Another problem with this change is that this is a public API. Though it is still marked as DeveloperApi, It would be better if we don't change it unless it is necessary. For example, we can add `GapBernoulliSampler` and keep this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536682 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536690 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536685 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536697 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536671 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536659 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536688 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536693 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536679 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536673 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536657 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536653 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536639 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom --- End diff -- `newDefaultRNG` or `createDefaultRNG` may be better. When I read the code `RandomSampler.rngDefault` in other places, I'm not sure whether this returns the same instance for all calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18536661 --- Diff: core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala --- @@ -18,96 +18,547 @@ package org.apache.spark.util.random import java.util.Random +import scala.collection.mutable.ArrayBuffer import cern.jet.random.Poisson -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.mock.EasyMockSugar - -class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { - - val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) - - var random: Random = _ - var poisson: Poisson = _ - - before { -random = mock[Random] -poisson = mock[Poisson] - } - - test("BernoulliSamplerWithRange") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) -} - } - - test("BernoulliSamplerWithRangeInverse") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerWithRatio") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.35) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) -} - } - - test("BernoulliSamplerWithComplement") { -expecting { - for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { -random.nextDouble().andReturn(x) - } -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.25, 0.55, true) - sampler.rng = random - assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) -} - } - - test("BernoulliSamplerSetSeed") { -expecting { - random.setSeed(10L) -} -whenExecuting(random) { - val sampler = new BernoulliSampler[Int](0.2) - sampler.rng = random - sampler.setSeed(10L) -} - } - - test("PoissonSampler") { -expecting { - for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { -poisson.nextInt().andReturn(x) - } -} -whenExecuting(poisson) { - val sampler = new PoissonSampler[Int](0.2) - sampler.rng = poisson - assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6)) -} +import cern.jet.random.engine.DRand + +import org.scalatest.{FunSuite, Matchers} + +class RandomSamplerSuite extends FunSuite with Matchers { + // My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test + // between the random samplers and simple reference samplers (known to work correctly). + // The sampling gap sizes between chosen samples should show up as having the same + // distributions between test and reference, if things are working properly. That is, + // the KS test will fail to strongly reject the null hypothesis that the distributions of + // sampling gaps are the same. + // There are no actual KS tests implemented for scala (that I can find) - and so what I + // have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak" + // p-value for a particular sample size. I can then test that my measured KS stats + // are less than D. Computing D-values is easy, and implemented below. + // + // I used the scipy 'kstwobign' distribution to pre-compute my D value: + // + // def ksdval(q=0.1, n=1000): + // en = np.sqrt(float(n) / 2.0) + // return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en) + // + // When comparing KS stats I take the median of a small number of independent test runs + // to compensate for the issue that any sampled statistic will show "false positive" with + // some probability. Even when two distributions are the same, they will register as + // different 10% of the time at a p-value of 0.1 + + // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000: +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57883154 @mengxr I'll be occupied next week but I'll try to respond asap to your feedback the week after --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57874204 @erikerlandson I didn't check the test code. I will try to find another time to make a pass on the test. The implementation looks good to me except minor inline comments. Could you create a JIRA for `.drop(...)` in sampling and link it to the upstream Scala JIRA? So we will remember to update it later. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423491 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423498 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423493 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423500 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423504 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423499 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423485 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423495 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423492 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423489 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423437 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The --- End diff -- bernoulli -> `Bernoulli` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423464 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423484 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423470 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423457 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423473 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423487 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423468 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423448 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs --- End diff -- use `RandomSampler.epsArgs` directly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423443 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. + */ + def gsmDefault: Double = 0.4 + + /** + * Default gap sampling epsilon + * When sampling random floating point values the gap sampling logic requires value > 0. An --- End diff -- What do you mean by `value`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423433 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum --- End diff -- add `.` to the end or insert an empty line. You can check the generated doc by `sbt/sbt unidoc`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423478 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423454 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423461 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423474 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423463 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423479 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423453 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423459 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423475 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423477 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0.0 - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1.0 + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +if (ub - lb <= 0.0) { + if (complement) items else Iterator.empty +} else { + if (complement) { +items.filter(item => { + val x = rng.nextDouble() + (x < lb) || (x >= ub) +}) + } else { +items.filter(item => { + val x = rng.nextDouble() + (x >= lb) && (x < ub) +}) + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) + + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) +} - override def clone = new BernoulliSampler[T](lb, ub, complement) + +/** + * :: DeveloperApi :: + * A sampler based on Bernoulli trials. + * + * @param fraction the sampling fraction, aka Bernoulli sampling probability + * @tparam T item type + */ +@DeveloperApi +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +if (fraction <= 0.0) { + Iterator.empty +} else if (fraction >= 1.0) { + items +} else if (fraction <= RandomSampler.gsmDefault) { + new GapSamplingIterator(items, fraction, rng, RandomSampler.epsDefault) +} else { + items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. * - * @param mean Poisson mean + * @param fraction the sampling fraction (with replacement) * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) - } +if (fraction <= 0.0) { + Iterator.
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423429 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi --- End diff -- `@DeveloperApi` is not necessary for package private classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423444 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. + */ + def gsmDefault: Double = 0.4 + + /** + * Default gap sampling epsilon + * When sampling random floating point values the gap sampling logic requires value > 0. An + * optimal value for this parameter is at or near the minimum positive floating point value + * returned by nextDouble() for the RNG being used. + */ + def epsDefault: Double = 5e-11 --- End diff -- The name `epsDefault` is not very clear to me. It could be a `val`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423440 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. + */ + def gsmDefault: Double = 0.4 + + /** + * Default gap sampling epsilon --- End diff -- ditto: add `.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423426 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -43,7 +43,8 @@ import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite} import org.apache.spark.util.collection.OpenHashMap -import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils} +import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliPartitionSampler, + SamplingUtils} --- End diff -- 2-space indentation may be better (thinking of the case when the package name is really long) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423449 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +89,238 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + /** epsilon slop to avoid failure from floating point jitter */ + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") --- End diff -- include the values of `lb` and `ub` in the message. remove the extra space between `ub` and `+` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423438 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -39,13 +42,46 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable /** take a random sample */ def sample(items: Iterator[T]): Iterator[U] + /** return a copy of the RandomSampler object */ override def clone: RandomSampler[T, U] = throw new NotImplementedError("clone() is not implemented.") } +@DeveloperApi +private [spark] +object RandomSampler { + /** Default random number generator used by random samplers */ + def rngDefault: Random = new XORShiftRandom + + /** + * Default gap sampling maximum + * For sampling fractions <= this value, the gap sampling optimization will be applied. + * Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + * optimal value for this will depend on the RNG. More expensive RNGs will tend to make + * the optimal value higher. The most reliable way to determine this value for a given RNG + * is to experiment. I would expect a value of 0.5 to be close in most cases. + */ + def gsmDefault: Double = 0.4 --- End diff -- We should be more specific on the name. `gsm` is not a common acronym (for sampling). I would recommend some names like `defaultMaxGapSamplingProb`. (This only applies to Bernoulli sampling but adding `Bernoulli` makes the name too long.) It could be a val: `val defaultMaxGapSamplingProb = 0.4`. (We don't need type info for primitive types.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18423427 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -375,7 +376,9 @@ abstract class RDD[T: ClassTag]( val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => - new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed) + new PartitionwiseSampledRDD[T, T](this, --- End diff -- The following style is commonly used in Spark: ~~~ new PartitionwiseSampledRDD[T, T]( this, new BernoulliPartitionSampler[T](x(0), x(1)), true, seed) ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57866985 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21275/Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57865927 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57865917 Jenkins, add to whitelist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57628456 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on the pull request: https://github.com/apache/spark/pull/2455#issuecomment-57395923 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18241039 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +81,237 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + // epsilon slop to avoid failure from floating point jitter + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0d - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1d + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +ub-lb match { + case f if (f <= 0d) => if (complement) items else Iterator.empty + case _ => { +if (complement) { + items.filter(item => { +val x = rng.nextDouble() +(x < lb) || (x >= ub) + }) +} else { + items.filter(item => { +val x = rng.nextDouble() +(x >= lb) && (x < ub) + }) +} + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) - override def clone = new BernoulliSampler[T](lb, ub, complement) + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler based on Bernoulli trials. * - * @param mean Poisson mean + * @param fraction the sampling fraction, aka Bernoulli sampling probability * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + // epsilon slop to avoid failure from floating point jitter + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault - private[random] var rng = new Poisson(mean, new DRand) + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +fraction match { + case f if (f <= 0.0) => Iterator.empty + case f if (f >= 1.0) => items + case f if (f <= RandomSampler.gsmDefault) => +new GapSamplingIterator(items, f, rng, RandomSampler.epsDefault) + case _ => items.filter(_ => (rng.nextDouble() <= fraction)) +} + } + + override def clone = new BernoulliSampler[T](fraction) +} + + +/** + * :: DeveloperApi :: + * A sampler for sampling with replacement, based on values drawn from Poisson distribution. + * + * @param fraction the sampling fraction (with replacement) + * @tparam T item type + */ +@DeveloperApi +class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + // epsilon slop to avoid failure from floating point jitter + @transient val eps = RandomSampler.epsArgs + require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0") + + private var curseed: Long = System.nanoTime + private var rng = new Poisson(fraction, new DRand(curseed.toInt)) override def setSeed(seed: Long) { -rng = new Poisson(mean, new DRand(seed.toInt)) +curseed = seed +rng = new Poisson(fraction, new DRand(seed.toInt)) } override def sample(items: Iterator[T]): Iterator[T] = { -items.flatMap { item => - val count = rng.nextInt() - if (count == 0) { -Iterator.empty - } else { -Iterator.fill(count)(item) +fraction match { +
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18202776 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +81,237 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + // epsilon slop to avoid failure from floating point jitter --- End diff -- Ah right, I still had in mind that the check was using `ub-lb`, but it isn't. Below, in both proposed versions of the code, could you not also specially handle the case where ub-lb >= 1? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18202539 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -43,9 +46,34 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable throw new NotImplementedError("clone() is not implemented.") } +private [spark] +object RandomSampler { + // Default random number generator used by random samplers + def rngDefault: Random = new XORShiftRandom + + // Default gap sampling maximum + // For sampling fractions <= this value, the gap sampling optimization will be applied. + // Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + // optimal value for this will depend on the RNG. More expensive RNGs will tend to make + // the optimal value higher. The most reliable way to determine this value for a given RNG + // is to experiment. I would expect a value of 0.5 to be close in most cases. + def gsmDefault: Double = 0.4 + + // Default gap sampling epsilon + // When sampling random floating point values the gap sampling logic requires value > 0. An + // optimal value for this parameter is at or near the minimum positive floating point value + // returned by nextDouble() for the RNG being used. + def epsDefault: Double = 5e-11 --- End diff -- Yeah I meant add .0 for clarity, then subtract `Double` for being overkill. I think this would be more consistent with Scala/Spark style that way, but at least I'd argue for the .0. Trivial here; more of a minor question for the whole code base. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18189085 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +81,237 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + // epsilon slop to avoid failure from floating point jitter --- End diff -- It doesn't affect the output, so I didn't spend the code/computation on clamping it. The only reason I put in the checks at all was because I felt like it might catch logic errors in calling code. For example, if somebody sampled with fraction '-1.0', it could cheerfully return an empty data set, but that would more likely be an error somewhere. But -0.0001 is more likely to be just rounding jitter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18189003 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -43,9 +46,34 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable throw new NotImplementedError("clone() is not implemented.") } +private [spark] +object RandomSampler { + // Default random number generator used by random samplers + def rngDefault: Random = new XORShiftRandom + + // Default gap sampling maximum + // For sampling fractions <= this value, the gap sampling optimization will be applied. + // Above this value, it is assumed that "tradtional" bernoulli sampling is faster. The + // optimal value for this will depend on the RNG. More expensive RNGs will tend to make + // the optimal value higher. The most reliable way to determine this value for a given RNG + // is to experiment. I would expect a value of 0.5 to be close in most cases. + def gsmDefault: Double = 0.4 + + // Default gap sampling epsilon + // When sampling random floating point values the gap sampling logic requires value > 0. An + // optimal value for this parameter is at or near the minimum positive floating point value + // returned by nextDouble() for the RNG being used. + def epsDefault: Double = 5e-11 --- End diff -- I don't see how the code can be any more clear than when the type is written out. That's as clear as it gets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3250] Implement Gap Sampling optimizati...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/2455#discussion_r18188928 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -53,56 +81,237 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable * @tparam T item type */ @DeveloperApi -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: Boolean = false) extends RandomSampler[T, T] { - private[random] var rng: Random = new XORShiftRandom + // epsilon slop to avoid failure from floating point jitter + @transient val eps: Double = RandomSampler.epsArgs + require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)") + require(lb >= (0d - eps), "Lower bound (lb) must be >= 0.0") + require(ub <= (1d + eps), "Upper bound (ub) must be <= 1.0") - def this(ratio: Double) = this(0.0d, ratio) + private val rng: Random = new XORShiftRandom override def setSeed(seed: Long) = rng.setSeed(seed) override def sample(items: Iterator[T]): Iterator[T] = { -items.filter { item => - val x = rng.nextDouble() - (x >= lb && x < ub) ^ complement +ub-lb match { + case f if (f <= 0d) => if (complement) items else Iterator.empty + case _ => { +if (complement) { + items.filter(item => { +val x = rng.nextDouble() +(x < lb) || (x >= ub) + }) +} else { + items.filter(item => { +val x = rng.nextDouble() +(x >= lb) && (x < ub) + }) +} + } } } /** * Return a sampler that is the complement of the range specified of the current sampler. */ - def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) + def cloneComplement(): BernoulliPartitionSampler[T] = +new BernoulliPartitionSampler[T](lb, ub, !complement) - override def clone = new BernoulliSampler[T](lb, ub, complement) + override def clone = new BernoulliPartitionSampler[T](lb, ub, complement) } + /** * :: DeveloperApi :: - * A sampler based on values drawn from Poisson distribution. + * A sampler based on Bernoulli trials. * - * @param mean Poisson mean + * @param fraction the sampling fraction, aka Bernoulli sampling probability * @tparam T item type */ @DeveloperApi -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { +class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] { + + // epsilon slop to avoid failure from floating point jitter + @transient val eps: Double = RandomSampler.epsArgs + require(fraction >= (0.0 - eps) && fraction <= (1.0 + eps), +"Sampling fraction must be on interval [0, 1]") + + private val rng: Random = RandomSampler.rngDefault - private[random] var rng = new Poisson(mean, new DRand) + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[T]): Iterator[T] = { +fraction match { --- End diff -- I found the pattern matching to be a nice readable expression, however I don't really have anything against using if/then/else if people prefer it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org