Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2455#discussion_r19629128
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
    @@ -52,57 +87,252 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
      * @tparam T item type
      */
     @DeveloperApi
    -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
    +class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean 
= false)
       extends RandomSampler[T, T] {
     
    -  private[random] var rng: Random = new XORShiftRandom
    +  /** epsilon slop to avoid failure from floating point jitter. */
    +  require(
    +    lb <= (ub + RandomSampler.roundingEpsilon),
    +    s"Lower bound ($lb) must be <= upper bound ($ub)")
    +  require(
    +    lb >= (0.0 - RandomSampler.roundingEpsilon),
    +    s"Lower bound ($lb) must be >= 0.0")
    +  require(
    +    ub <= (1.0 + RandomSampler.roundingEpsilon),
    +    s"Upper bound ($ub) must be <= 1.0")
     
    -  def this(ratio: Double) = this(0.0d, ratio)
    +  private val rng: Random = new XORShiftRandom
     
       override def setSeed(seed: Long) = rng.setSeed(seed)
     
       override def sample(items: Iterator[T]): Iterator[T] = {
    -    items.filter { item =>
    -      val x = rng.nextDouble()
    -      (x >= lb && x < ub) ^ complement
    +    if (ub - lb <= 0.0) {
    +      if (complement) items else Iterator.empty
    +    } else {
    +      if (complement) {
    +        items.filter(item => {
    +          val x = rng.nextDouble()
    +          (x < lb) || (x >= ub)
    +        })
    +      } else {
    +        items.filter(item => {
    +          val x = rng.nextDouble()
    +          (x >= lb) && (x < ub)
    +        })
    +      }
         }
       }
     
       /**
        *  Return a sampler that is the complement of the range specified of 
the current sampler.
        */
    -  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
    +  def cloneComplement(): BernoulliCellSampler[T] =
    +    new BernoulliCellSampler[T](lb, ub, !complement)
    +
    +  override def clone = new BernoulliCellSampler[T](lb, ub, complement)
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * A sampler based on Bernoulli trials.
    + *
    + * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
    + * @tparam T item type
    + */
    +@DeveloperApi
    +class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
    +
    +  /** epsilon slop to avoid failure from floating point jitter */
    +  require(
    +    fraction >= (0.0 - RandomSampler.roundingEpsilon)
    +      && fraction <= (1.0 + RandomSampler.roundingEpsilon),
    +    s"Sampling fraction ($fraction) must be on interval [0, 1]")
     
    -  override def clone = new BernoulliSampler[T](lb, ub, complement)
    +  private val rng: Random = RandomSampler.newDefaultRNG
    +
    +  override def setSeed(seed: Long) = rng.setSeed(seed)
    +
    +  override def sample(items: Iterator[T]): Iterator[T] = {
    +    if (fraction <= 0.0) {
    +      Iterator.empty
    +    } else if (fraction >= 1.0) {
    +      items
    +    } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) {
    +      new GapSamplingIterator(items, fraction, rng, 
RandomSampler.fractionEpsilon)
    +    } else {
    +      items.filter(_ => (rng.nextDouble() <= fraction))
    +    }
    +  }
    +
    +  override def clone = new BernoulliSampler[T](fraction)
     }
     
    +
     /**
      * :: DeveloperApi ::
    - * A sampler based on values drawn from Poisson distribution.
    + * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
      *
    - * @param mean Poisson mean
    + * @param fraction the sampling fraction (with replacement)
      * @tparam T item type
      */
     @DeveloperApi
    -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
    +class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
    +
    +  /** Epsilon slop to avoid failure from floating point jitter. */
    +  require(
    +    fraction >= (0.0 - RandomSampler.roundingEpsilon),
    +    s"Sampling fraction ($fraction) must be >= 0")
     
    -  private[random] var rng = new PoissonDistribution(mean)
    +  // PoissonDistribution throws an exception when fraction <= 0
    +  // If fraction is <= 0, Iterator.empty is used below, so we can use any 
placeholder value.
    +  private val rng = new PoissonDistribution(if (fraction > 0.0) fraction 
else 1.0)
    +  private val rngGap = RandomSampler.newDefaultRNG
     
       override def setSeed(seed: Long) {
    -    rng = new PoissonDistribution(mean)
         rng.reseedRandomGenerator(seed)
    +    rngGap.setSeed(seed)
       }
     
       override def sample(items: Iterator[T]): Iterator[T] = {
    -    items.flatMap { item =>
    -      val count = rng.sample()
    -      if (count == 0) {
    -        Iterator.empty
    -      } else {
    -        Iterator.fill(count)(item)
    -      }
    +    if (fraction <= 0.0) {
    +      Iterator.empty
    +    } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) {
    +        new GapSamplingReplacementIterator(items, fraction, rngGap, 
RandomSampler.fractionEpsilon)
    +    } else {
    +      items.flatMap(item => {
    +        val count = rng.sample()
    +        if (count == 0) Iterator.empty else Iterator.fill(count)(item)
    +      })
    +    }
    +  }
    +
    +  override def clone = new PoissonSampler[T](fraction)
    +}
    +
    +
    +private [spark]
    +class GapSamplingIterator[T: ClassTag](var data: Iterator[T], f: Double,
    +    rng: Random = RandomSampler.newDefaultRNG,
    +    epsilon: Double = RandomSampler.fractionEpsilon) extends Iterator[T] {
    +
    +  require(f > 0.0  &&  f < 1.0, s"Sampling fraction ($f) must reside on 
open interval (0, 1)")
    +  require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0")
    +
    +  /** implement efficient linear-sequence drop until Scala includes fix 
for jira SI-8835. */
    +  private val iterDrop: Int => Unit = {
    +    val arrayClass = Array.empty[T].iterator.getClass
    +    val arrayBufferClass = ArrayBuffer.empty[T].iterator.getClass
    +    data.getClass match {
    +      case `arrayClass` => ((n: Int) => { data = data.drop(n) })
    +      case `arrayBufferClass` => ((n: Int) => { data = data.drop(n) })
    +      case _ => ((n: Int) => {
    +          var j = 0
    +          while (j < n && data.hasNext) {
    +            data.next()
    +            j += 1
    +          }
    +        })
    +    }
    +  }
    +
    +  override def hasNext: Boolean = data.hasNext
    +
    +  override def next(): T = {
    +    val r = data.next()
    +    advance
    +    r
    +  }
    +
    +  private val lnq = math.log1p(-f)
    +
    +  /** skip elements that won't be sampled, according to geometric dist 
P(k) = (f)(1-f)^k. */
    +  private def advance: Unit = {
    +    val u = math.max(rng.nextDouble(), epsilon)
    +    val k = (math.log(u) / lnq).toInt
    +    iterDrop(k)
    +  }
    +
    +  /** advance to first sample as part of object construction. */
    +  advance
    +}
    +
    +private [spark]
    +class GapSamplingReplacementIterator[T: ClassTag](var data: Iterator[T], 
f: Double,
    +    rng: Random = RandomSampler.newDefaultRNG,
    +    epsilon: Double = RandomSampler.fractionEpsilon) extends Iterator[T] {
    +
    +  require(f > 0.0, s"Sampling fraction ($f) must be > 0")
    +  require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0")
    +
    +  /** implement efficient linear-sequence drop until scala includes fix 
for jira SI-8835. */
    +  private val iterDrop: Int => Unit = {
    +    val arrayClass = Array.empty[T].iterator.getClass
    +    val arrayBufferClass = ArrayBuffer.empty[T].iterator.getClass
    +    data.getClass match {
    +      case `arrayClass` => ((n: Int) => { data = data.drop(n) })
    +      case `arrayBufferClass` => ((n: Int) => { data = data.drop(n) })
    +      case _ => ((n: Int) => {
    +          var j = 0
    +          while (j < n && data.hasNext) {
    +            data.next()
    +            j += 1
    +          }
    +        })
    +    }
    +  }
    +
    +  /** current sampling value, and its replication factor, as we are 
sampling with replacement. */
    +  private var v: T = _
    +  private var rep: Int = 0
    +
    +  override def hasNext: Boolean = data.hasNext || rep > 0
    +
    +  override def next(): T = {
    +    val r = v
    +    rep -= 1
    +    if (rep <= 0) advance
    +    r
    +  }
    +
    +  /**
    +   * Skip elements with replication factor zero (i.e. elements that won't 
be sampled).
    +   * Samples 'k' from geometric distribution  P(k) = (1-q)(q)^k, where q = 
e^(-f), that is
    +   * q is the probabililty of Poisson(0; f)
    +   */
    +  private def advance: Unit = {
    +    val u = math.max(rng.nextDouble(), epsilon)
    +    val k = (math.log(u)/(-f)).toInt
    +    iterDrop(k)
    +    // set the value and replication factor for the next value
    +    if (data.hasNext) {
    +      v = data.next()
    +      rep = poissonGE1
    +    }
    +  }
    +
    +  private val q = math.exp(-f)
    +
    +  /**
    +   * Sample from Poisson distribution, conditioned such that the sampled 
value is >= 1.
    +   * This is an adaptation from the algorithm for Generating Poisson 
distributed random variables:
    +   * http://en.wikipedia.org/wiki/Poisson_distribution
    +   */
    +  private def poissonGE1: Int = {
    +    // simulate that the standard poisson sampling
    +    // gave us at least one iteration, for a sample of >= 1
    +    var pp = q + ((1.0 - q) * rng.nextDouble())
    +    var r = 1
    +
    +    // now continue with standard poisson sampling algorithm
    +    pp *= rng.nextDouble()
    +    while (pp > q) {
    +      r += 1
    +      pp *= rng.nextDouble()
         }
    +    r
       }
     
    -  override def clone = new PoissonSampler[T](mean)
    +  /** advance to first sample as part of object construction. */
    --- End diff --
    
    Shall we move this line after the constructor and preconditions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to