Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/216#discussion_r11168529 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.discretization + +import scala.collection.mutable +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.mllib.regression.LabeledPoint + +/** + * This class contains methods to calculate thresholds to discretize continuous values with the + * method proposed by Fayyad and Irani in Multi-Interval Discretization of Continuous-Valued + * Attributes (1993). + * + * @param continuousFeaturesIndexes Indexes of features to be discretized. + * @param elementsPerPartition Maximum number of thresholds to treat in each RDD partition. + * @param maxBins Maximum number of bins for each discretized feature. + */ +class EntropyMinimizationDiscretizer private ( + val continuousFeaturesIndexes: Seq[Int], + val elementsPerPartition: Int, + val maxBins: Int) + extends Serializable { + + private val partitions = { x: Long => math.ceil(x.toDouble / elementsPerPartition).toInt } + private val log2 = { x: Double => math.log(x) / math.log(2) } + + /** + * Run the algorithm with the configured parameters on an input. + * @param data RDD of LabeledPoint's. + * @return A EntropyMinimizationDiscretizerModel with the thresholds to discretize. + */ + def run(data: RDD[LabeledPoint]) = { + val labels2Int = data.context.broadcast(data.map(_.label).distinct.collect.zipWithIndex.toMap) + val nLabels = labels2Int.value.size + + var thresholds = Map.empty[Int, Seq[Double]] + for (i <- continuousFeaturesIndexes) { + val featureValues = data.map({ + case LabeledPoint(label, values) => (values(i), labels2Int.value(label)) + }) + val sortedValues = featureValues.sortByKey() + val initialCandidates = initialThresholds(sortedValues, nLabels) + val thresholdsForFeature = this.getThresholds(initialCandidates, nLabels) + thresholds += ((i, thresholdsForFeature)) + } + + new EntropyMinimizationDiscretizerModel(thresholds) + + } + + /** + * Calculates the initial candidate treholds for a feature + * @param data RDD of (value, label) pairs. + * @param nLabels Number of distinct labels in the dataset. + * @return RDD of (candidate, class frequencies between last and current candidate) pairs. + */ + private def initialThresholds(data: RDD[(Double, Int)], nLabels: Int) = { + data.mapPartitions({ it => + var lastX = Double.NegativeInfinity + var lastY = Int.MinValue + var result = Seq.empty[(Double, Array[Long])] + var freqs = Array.fill[Long](nLabels)(0L) + + for ((x, y) <- it) { + if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) { + // new candidate and interval + result = ((x + lastX) / 2, freqs) +: result + freqs = Array.fill[Long](nLabels)(0L) + freqs(y) = 1L + } else { + // we continue on the same interval + freqs(y) += 1 + } + lastX = x + lastY = y + } + + // we add last element as a candidate threshold for convenience + result = (lastX, freqs) +: result + + result.reverse.toIterator + }).persist(StorageLevel.MEMORY_AND_DISK) + } + + /** + * Returns a sequence of doubles that define the intervals to make the discretization. + * + * @param candidates RDD of (value, label) pairs + */ + private def getThresholds(candidates: RDD[(Double, Array[Long])], nLabels: Int): Seq[Double] = { + + // Create queue + val stack = new mutable.Queue[((Double, Double), Option[Double])] + + // Insert first in the stack + stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None)) + var result = Seq(Double.NegativeInfinity) + + // While more elements to eval, continue + while(stack.length > 0 && result.size < this.maxBins){ + + val (bounds, lastThresh) = stack.dequeue + + var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 }) + val nCands = cands.count + if (nCands > 0) { + cands = cands.coalesce(partitions(nCands)) + + evalThresholds(cands, lastThresh, nLabels) match { + case Some(th) => + result = th +: result + stack.enqueue(((bounds._1, th), Some(th))) + stack.enqueue(((th, bounds._2), Some(th))) + case None => /* criteria not fulfilled, finish */ + } + } + } + (Double.PositiveInfinity +: result).sorted + } + + /** + * Selects one final thresholds among the candidates and returns two partitions to recurse + * + * @param candidates RDD of (candidate, class frequencies between last and current candidate) + * @param lastSelected last selected threshold to avoid selecting it again + */ + private def evalThresholds( + candidates: RDD[(Double, Array[Long])], + lastSelected : Option[Double], + nLabels: Int) = { + + var result = candidates.map({ + case (cand, freqs) => + (cand, freqs, Array.empty[Long], Array.empty[Long]) + }).cache + + val numPartitions = candidates.partitions.size + val bcNumPartitions = candidates.context.broadcast(numPartitions) + + // stores accumulated freqs from left to right + val bcLeftTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator) + // stores accumulated freqs from right to left + val bcRightTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator) + + // calculates accumulated frequencies for each candidate + (0 until numPartitions) foreach { l2rIndex => --- End diff -- This block is still quite complicated. Let's say you have partitioned frequency sequence ~~~ 1 2 1 3 || 1 1 1 1 || 1 2 3 2 ~~~ From left to right, you want ~~~ 1 3 4 7 || 8 9 10 11 || 12 14 17 19 ~~~ From right to left, you want ~~~ 19 18 16 15 || 12 11 10 9 || 8 7 5 2 ~~~ This can be done by first computing partition-wise sums: ~~~ 7 || 4 || 8 ~~~ Then compute cumsums from left to right ~~~ 0 || 7 || 11 || 19 ~~~ and from right to left ~~~ 19 || 12 || 8 || 0 ~~~ The global cumsum becomes, from left to right ~~~ 0 + cumsum(1, 2, 1, 3) || 7 + cumsum(1, 1, 1, 1) || 11 + cumsum(1, 2, 3, 2) ~~~ from right to left ~~~ 19 - cumsum(0, 1, 2, 1) || 12 - cumsum(0, 1, 1, 1) || 8 - cumsum(0, 1, 2, 3) ~~~ There are some details I may miss. But two jobs should be sufficient for this computation.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---