Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2455#discussion_r18124397
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
    @@ -53,56 +81,237 @@ trait RandomSampler[T, U] extends Pseudorandom with 
Cloneable with Serializable
      * @tparam T item type
      */
     @DeveloperApi
    -class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = 
false)
    +class BernoulliPartitionSampler[T](lb: Double, ub: Double, complement: 
Boolean = false)
       extends RandomSampler[T, T] {
     
    -  private[random] var rng: Random = new XORShiftRandom
    +  // epsilon slop to avoid failure from floating point jitter
    +  @transient val eps: Double = RandomSampler.epsArgs
    +  require(lb <= (ub + eps), "Lower bound (lb) must be <= upper bound (ub)")
    +  require(lb >= (0d - eps), "Lower bound (lb) must be >= 0.0")
    +  require(ub <= (1d + eps), "Upper bound (ub) must be <= 1.0")
     
    -  def this(ratio: Double) = this(0.0d, ratio)
    +  private val rng: Random = new XORShiftRandom
     
       override def setSeed(seed: Long) = rng.setSeed(seed)
     
       override def sample(items: Iterator[T]): Iterator[T] = {
    -    items.filter { item =>
    -      val x = rng.nextDouble()
    -      (x >= lb && x < ub) ^ complement
    +    ub-lb match {
    +      case f if (f <= 0d) => if (complement) items else Iterator.empty
    +      case _ => {
    +        if (complement) {
    +          items.filter(item => {
    +            val x = rng.nextDouble()
    +            (x < lb) || (x >= ub)
    +          })
    +        } else {
    +          items.filter(item => {
    +            val x = rng.nextDouble()
    +            (x >= lb) && (x < ub)
    +          })
    +        }
    +      }
         }
       }
     
       /**
        *  Return a sampler that is the complement of the range specified of 
the current sampler.
        */
    -  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, 
ub, !complement)
    +  def cloneComplement(): BernoulliPartitionSampler[T] =
    +    new BernoulliPartitionSampler[T](lb, ub, !complement)
     
    -  override def clone = new BernoulliSampler[T](lb, ub, complement)
    +  override def clone = new BernoulliPartitionSampler[T](lb, ub, complement)
     }
     
    +
     /**
      * :: DeveloperApi ::
    - * A sampler based on values drawn from Poisson distribution.
    + * A sampler based on Bernoulli trials.
      *
    - * @param mean Poisson mean
    + * @param fraction the sampling fraction, aka Bernoulli sampling 
probability
      * @tparam T item type
      */
     @DeveloperApi
    -class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
    +class BernoulliSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
    +
    +  // epsilon slop to avoid failure from floating point jitter
    +  @transient val eps: Double = RandomSampler.epsArgs
    +  require(fraction >= (0.0 - eps)  &&  fraction <= (1.0 + eps),
    +    "Sampling fraction must be on interval [0, 1]")
    +
    +  private val rng: Random = RandomSampler.rngDefault
     
    -  private[random] var rng = new Poisson(mean, new DRand)
    +  override def setSeed(seed: Long) = rng.setSeed(seed)
    +
    +  override def sample(items: Iterator[T]): Iterator[T] = {
    +    fraction match {
    +      case f if (f <= 0.0) => Iterator.empty
    +      case f if (f >= 1.0) => items
    +      case f if (f <= RandomSampler.gsmDefault) =>
    +        new GapSamplingIterator(items, f, rng, RandomSampler.epsDefault)
    +      case _ => items.filter(_ => (rng.nextDouble() <= fraction))
    +    }
    +  }
    +
    +  override def clone = new BernoulliSampler[T](fraction)
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * A sampler for sampling with replacement, based on values drawn from 
Poisson distribution.
    + *
    + * @param fraction the sampling fraction (with replacement)
    + * @tparam T item type
    + */
    +@DeveloperApi
    +class PoissonSampler[T: ClassTag](fraction: Double) extends 
RandomSampler[T, T] {
    +
    +  // epsilon slop to avoid failure from floating point jitter
    +  @transient val eps = RandomSampler.epsArgs
    +  require(fraction >= (0.0 - eps), "Sampling fraction must be >= 0")
    +
    +  private var curseed: Long = System.nanoTime
    +  private var rng = new Poisson(fraction, new DRand(curseed.toInt))
     
       override def setSeed(seed: Long) {
    -    rng = new Poisson(mean, new DRand(seed.toInt))
    +    curseed = seed
    +    rng = new Poisson(fraction, new DRand(seed.toInt))
       }
     
       override def sample(items: Iterator[T]): Iterator[T] = {
    -    items.flatMap { item =>
    -      val count = rng.nextInt()
    -      if (count == 0) {
    -        Iterator.empty
    -      } else {
    -        Iterator.fill(count)(item)
    +    fraction match {
    +      case f if (f <= 0.0) => Iterator.empty
    +      case f if (f <= RandomSampler.gsmDefault) => {
    +        val trng = RandomSampler.rngDefault
    +        trng.setSeed(curseed)
    +        new GapSamplingReplacementIterator(items, f, trng, 
RandomSampler.epsDefault)
           }
    +      case _ => items.flatMap(item => {
    +          val count = rng.nextInt()
    +          if (count == 0) Iterator.empty else Iterator.fill(count)(item)
    +        })
    +    }
    +  }
    +
    +  override def clone = new PoissonSampler[T](fraction)
    +}
    +
    +
    +@DeveloperApi
    +private [spark]
    +class GapSamplingIterator[T: ClassTag](var data: Iterator[T], f: Double,
    +                                       rng: Random = 
RandomSampler.rngDefault,
    +                                       epsilon: Double = 
RandomSampler.epsDefault
    +                                      ) extends Iterator[T] {
    +
    +  require(f > 0.0  &&  f < 1.0, "Sampling fraction (f) must reside on open 
interval (0, 1)")
    +  require(epsilon > 0.0, "epsilon must be > 0")
    +
    +  // implement efficient linear-sequence drop until scala includes fix for 
jira SI-8835
    +  private val dd: Int => Unit = {
    +    val arrayClass = Array.empty[T].iterator.getClass
    --- End diff --
    
    Is this going to be inefficient to keep accessing the class and its name on 
every invocation?
    And rather than match on class names, can you just match an instance of 
`Array` or `ArrayBuffer`? I may be missing a reason you can't do that in Scala 
here. I understand you're avoiding calling `drop` for other types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to