Github user falaki commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1025#discussion_r14688121
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala ---
    @@ -45,11 +50,75 @@ private[spark] object SamplingUtils {
         val fraction = sampleSizeLowerBound.toDouble / total
         if (withReplacement) {
           val numStDev = if (sampleSizeLowerBound < 12) 9 else 5
    -      fraction + numStDev * math.sqrt(fraction / total)
    +      math.max(1e-10, fraction + numStDev * math.sqrt(fraction / total))
         } else {
           val delta = 1e-4
           val gamma = - math.log(delta) / total
    -      math.min(1, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * 
fraction))
    +      math.min(1,
    +        math.max(1e-10, fraction + gamma + math.sqrt(gamma * gamma + 2 * 
gamma * fraction)))
    +    }
    +  }
    +}
    +
    +/**
    + * Utility functions that help us determine bounds on adjusted sampling 
rate to guarantee exact
    + * sample sizes with high confidence when sampling with replacement.
    + *
    + * The algorithm for guaranteeing sample size instantly accepts items 
whose associated value drawn
    + * from Pois(s) is less than the lower bound and puts items whose value is 
between the lower and
    + * upper bound in a waitlist. The final sample is consisted of all items 
accepted on the fly and a
    + * portion of the waitlist needed to make the exact sample size.
    + */
    +private[spark] object PoissonBounds {
    +
    +  val delta = 1e-4 / 3.0
    +  val epsilon = 1e-15
    +
    +  /**
    +   * Compute the threshold for accepting items on the fly. The threshold 
value is a fairly small
    +   * number, which means if the item has an associated value < threshold, 
it is highly likely to
    +   * be in the final sample. Hence we accept items with values less than 
the returned value of this
    +   * function instantly.
    +   *
    +   * @param s sample size
    +   * @return threshold for accepting items on the fly
    +   */
    +  def getLowerBound(s: Double): Double = {
    +    var lb = math.max(0.0, s - math.sqrt(s / delta)) // Chebyshev's 
inequality
    +    var ub = s
    +    while (lb < ub - 1.0) {
    +      val m = (lb + ub) / 2.0
    +      val poisson = new PoissonDistribution(m, epsilon)
    +      val y = poisson.inverseCumulativeProbability(1 - delta)
    +      if (y > s) ub = m else lb = m
    +    }
    +    lb
    +  }
    +
    +  def getMinCount(lmbd: Double): Double = {
    +    if (lmbd == 0) return 0
    --- End diff --
    
    Style nit: it would be nicer to avoid 'return' and write this as an 
expression.
    ```
    if (lmbd == 0) {
      0
    } else {
      val poisson = new PoissonDistribution(lmbd, epsilon)
      poisson.inverseCumulativeProbability(delta)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to