Github user falaki commented on a diff in the pull request: https://github.com/apache/spark/pull/1025#discussion_r14688121 --- Diff: core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala --- @@ -45,11 +50,75 @@ private[spark] object SamplingUtils { val fraction = sampleSizeLowerBound.toDouble / total if (withReplacement) { val numStDev = if (sampleSizeLowerBound < 12) 9 else 5 - fraction + numStDev * math.sqrt(fraction / total) + math.max(1e-10, fraction + numStDev * math.sqrt(fraction / total)) } else { val delta = 1e-4 val gamma = - math.log(delta) / total - math.min(1, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction)) + math.min(1, + math.max(1e-10, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction))) + } + } +} + +/** + * Utility functions that help us determine bounds on adjusted sampling rate to guarantee exact + * sample sizes with high confidence when sampling with replacement. + * + * The algorithm for guaranteeing sample size instantly accepts items whose associated value drawn + * from Pois(s) is less than the lower bound and puts items whose value is between the lower and + * upper bound in a waitlist. The final sample is consisted of all items accepted on the fly and a + * portion of the waitlist needed to make the exact sample size. + */ +private[spark] object PoissonBounds { + + val delta = 1e-4 / 3.0 + val epsilon = 1e-15 + + /** + * Compute the threshold for accepting items on the fly. The threshold value is a fairly small + * number, which means if the item has an associated value < threshold, it is highly likely to + * be in the final sample. Hence we accept items with values less than the returned value of this + * function instantly. + * + * @param s sample size + * @return threshold for accepting items on the fly + */ + def getLowerBound(s: Double): Double = { + var lb = math.max(0.0, s - math.sqrt(s / delta)) // Chebyshev's inequality + var ub = s + while (lb < ub - 1.0) { + val m = (lb + ub) / 2.0 + val poisson = new PoissonDistribution(m, epsilon) + val y = poisson.inverseCumulativeProbability(1 - delta) + if (y > s) ub = m else lb = m + } + lb + } + + def getMinCount(lmbd: Double): Double = { + if (lmbd == 0) return 0 --- End diff -- Style nit: it would be nicer to avoid 'return' and write this as an expression. ``` if (lmbd == 0) { 0 } else { val poisson = new PoissonDistribution(lmbd, epsilon) poisson.inverseCumulativeProbability(delta) } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---