Repository: spark Updated Branches: refs/heads/master 4d01aa464 -> d6f76eb34
[SPARK-21057][ML] Do not use a PascalDistribution in countApprox ## What changes were proposed in this pull request? Use Poisson analysis for approx count in all cases. ## How was this patch tested? Existing tests. Author: Sean Owen <so...@cloudera.com> Closes #18276 from srowen/SPARK-21057. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6f76eb3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6f76eb3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6f76eb3 Branch: refs/heads/master Commit: d6f76eb346b691a553e89d3283f2a235661ae78c Parents: 4d01aa4 Author: Sean Owen <so...@cloudera.com> Authored: Wed Jun 14 09:01:20 2017 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Jun 14 09:01:20 2017 +0100 ---------------------------------------------------------------------- .../apache/spark/partial/CountEvaluator.scala | 23 +++++--------------- .../spark/partial/CountEvaluatorSuite.scala | 12 +++++----- 2 files changed, 12 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d6f76eb3/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala index 5a5bd7f..cbee136 100644 --- a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import org.apache.commons.math3.distribution.{PascalDistribution, PoissonDistribution} +import org.apache.commons.math3.distribution.PoissonDistribution /** * An ApproximateEvaluator for counts. @@ -48,22 +48,11 @@ private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double) private[partial] object CountEvaluator { def bound(confidence: Double, sum: Long, p: Double): BoundedDouble = { - // Let the total count be N. A fraction p has been counted already, with sum 'sum', - // as if each element from the total data set had been seen with probability p. - val dist = - if (sum <= 10000) { - // The remaining count, k=N-sum, may be modeled as negative binomial (aka Pascal), - // where there have been 'sum' successes of probability p already. (There are several - // conventions, but this is the one followed by Commons Math3.) - new PascalDistribution(sum.toInt, p) - } else { - // For large 'sum' (certainly, > Int.MaxValue!), use a Poisson approximation, which has - // a different interpretation. "sum" elements have been observed having scanned a fraction - // p of the data. This suggests data is counted at a rate of sum / p across the whole data - // set. The total expected count from the rest is distributed as - // (1-p) Poisson(sum / p) = Poisson(sum*(1-p)/p) - new PoissonDistribution(sum * (1 - p) / p) - } + // "sum" elements have been observed having scanned a fraction + // p of the data. This suggests data is counted at a rate of sum / p across the whole data + // set. The total expected count from the rest is distributed as + // (1-p) Poisson(sum / p) = Poisson(sum*(1-p)/p) + val dist = new PoissonDistribution(sum * (1 - p) / p) // Not quite symmetric; calculate interval straight from discrete distribution val low = dist.inverseCumulativeProbability((1 - confidence) / 2) val high = dist.inverseCumulativeProbability((1 + confidence) / 2) http://git-wip-us.apache.org/repos/asf/spark/blob/d6f76eb3/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala b/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala index da3256b..3c1208c 100644 --- a/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala @@ -23,21 +23,21 @@ class CountEvaluatorSuite extends SparkFunSuite { test("test count 0") { val evaluator = new CountEvaluator(10, 0.95) - assert(new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity) == evaluator.currentResult()) + assert(evaluator.currentResult() === new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity)) evaluator.merge(1, 0) - assert(new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity) == evaluator.currentResult()) + assert(evaluator.currentResult() === new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity)) } test("test count >= 1") { val evaluator = new CountEvaluator(10, 0.95) evaluator.merge(1, 1) - assert(new BoundedDouble(10.0, 0.95, 1.0, 36.0) == evaluator.currentResult()) + assert(evaluator.currentResult() === new BoundedDouble(10.0, 0.95, 5.0, 16.0)) evaluator.merge(1, 3) - assert(new BoundedDouble(20.0, 0.95, 7.0, 41.0) == evaluator.currentResult()) + assert(evaluator.currentResult() === new BoundedDouble(20.0, 0.95, 13.0, 28.0)) evaluator.merge(1, 8) - assert(new BoundedDouble(40.0, 0.95, 24.0, 61.0) == evaluator.currentResult()) + assert(evaluator.currentResult() === new BoundedDouble(40.0, 0.95, 30.0, 51.0)) (4 to 10).foreach(_ => evaluator.merge(1, 10)) - assert(new BoundedDouble(82.0, 1.0, 82.0, 82.0) == evaluator.currentResult()) + assert(evaluator.currentResult() === new BoundedDouble(82.0, 1.0, 82.0, 82.0)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org