Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939524 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + * statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { + val n1 = data1.count().toDouble + val n2 = data2.count().toDouble + val isSample1 = true // identifier for sample 1, needed after co-sort + // combine identified samples + val joinedData = data1.map(x => (x, isSample1)) ++ data2.map(x => (x, !isSample1)) + // co-sort and operate on each partition + val localData = joinedData.sortBy { case (v, id) => v }.mapPartitions { part => + searchTwoSampleCandidates(part, n1, n2) // local extrema + }.collect() + val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme + evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + * each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + * distance , the second is an unadjusted maximum distance (both of which will later + * be adjusted by a constant to account for elements in prior partitions), and a + * count corresponding to the numerator of the adjustment constant coming from this + * partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) + : Iterator[(Double, Double, Double)] = { + // fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 + case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) + val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0) + // traverse the data in partition and calculate distances and counts + val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, isSample1)) => + val (add1, add2) = if (isSample1) (1, 0) else (0, 1) + val cdf1 = (acc.ix1 + add1) / n1 + val cdf2 = (acc.ix2 + add2) / n2 + val dist = cdf1 - cdf2 + KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + add1, acc.ix2 + add2) + } + val results = if (pResults == initAcc) { + Array[(Double, Double, Double)]() + } else { + Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1)) + } + results.iterator + } + + /** + * Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to + * the two-sample, two-sided Kolmogorov-Smirnov test + * @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each + * partition, along with the numerator for the necessary constant adjustments + * @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size + * of sample 2)) + * @return The two-sample, two-sided Kolmogorov-Smirnov statistic + */ + private def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) + : Double = { + val initAcc = (Double.MinValue, 0.0) // maximum distance and numerator for constant adjustment --- End diff -- Place this comment on the line above
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org