Github user LIDIAgroup commented on a diff in the pull request: https://github.com/apache/spark/pull/216#discussion_r11019751 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala --- @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.discretization + +import scala.collection.mutable.Stack +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.util.InfoTheory +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.mllib.regression.LabeledPoint +import scala.collection.mutable + + +/** + * This class contains methods to discretize continuous values with the method proposed in + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993] + */ +class EntropyMinimizationDiscretizer private ( + @transient var data: RDD[LabeledPoint], + @transient var continousFeatures: Seq[Int], + var elementsPerPartition: Int = 18000, + var maxBins: Int = Int.MaxValue) + extends DiscretizerModel { + + private var thresholds = Map.empty[Int, Seq[Double]] + private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt } + + def this() = this(null, null) + + /** + * Sets the RDD[LabeledPoint] to be discretized + * + * @param data RDD[LabeledPoint] to be discretized + */ + def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = { + this.data = data + this + } + + /** + * Sets the indexes of the features to be discretized + * + * @param continuousFeatures Indexes of features to be discretized + */ + def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = { + this.continousFeatures = continuousFeatures + this + } + + /** + * Sets the maximum number of elements that a partition should have + * + * @param ratio Maximum number of elements for a partition + * @return The Discretizer with the parameter changed + */ + def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = { + this.elementsPerPartition = ratio + this + } + + /** + * Sets the maximum number of discrete values + * + * @param maxBins Maximum number of discrete values + * @return The Discretizer with the parameter changed + */ + def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = { + this.maxBins = maxBins + this + } + + /** + * Returns the thresholds used to discretized the given feature + * + * @param feature The number of the feature to discretize + */ + def getThresholdsForFeature(feature: Int): Seq[Double] = { + thresholds.get(feature) match { + case Some(a) => a + case None => + val featureValues = data.map({ + case LabeledPoint(label, values) => (values(feature), label.toString.trim) + }) + val sortedValues = featureValues.sortByKey() + val initial_candidates = initialThresholds(sortedValues) + val thresholdsForFeature = this.getThresholds(initial_candidates) + this.thresholds += ((feature, thresholdsForFeature)) + thresholdsForFeature + } + } + + /** + * Returns the thresholds used to discretized the given features + * + * @param features The number of the feature to discretize + */ + def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = { + for (feature <- features diff this.thresholds.keys.toSeq) { + getThresholdsForFeature(feature) + } + + this.thresholds.filter({ features.contains(_) }) + } + + /** + * Returns the thresholds used to discretized the continuous features + */ + def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = { + for (feature <- continousFeatures diff this.thresholds.keys.toSeq) { + getThresholdsForFeature(feature) + } + + this.thresholds + } + + /** + * Calculates the initial candidate treholds for a feature + * @param data RDD of (value, label) pairs + * @return RDD of (candidate, class frequencies between last and current candidate) pairs + */ + private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = { + data.mapPartitions({ it => + var lastX = Double.NegativeInfinity + var lastY = "" + var result = Seq.empty[(Double, Map[String, Int])] + var freqs = Map.empty[String, Int] + + for ((x, y) <- it) { + if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) { + // new candidate and interval + result = ((x + lastX) / 2, freqs) +: result + freqs = freqs.empty + ((y, 1)) + } else { + // we continue on the same interval + freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1) + } + lastX = x + lastY = y + } + + // we add last element as a candidate threshold for convenience + result = (lastX, freqs) +: result + + result.reverse.toIterator + }).persist(StorageLevel.MEMORY_AND_DISK) + } + + /** + * Returns a sequence of doubles that define the intervals to make the discretization. + * + * @param candidates RDD of (value, label) pairs + */ + private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = { + + //Create queue + val stack = new mutable.Queue[((Double, Double), Option[Double])] + + //Insert first in the stack + stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None)) + var result = Seq(Double.NegativeInfinity) + + // While more elements to eval, continue + while(stack.length > 0 && result.size < this.maxBins){ + + val (bounds, lastThresh) = stack.dequeue + + var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 }) + val nCands = cands.count + if (nCands > 0) { + cands = cands.coalesce(partitions(nCands)) --- End diff -- This tries to avoid having lots of partitions for just a few items. Since the nature of the algorithm is analyzing subsets of the initial candidates set, each time the RDD has fewer elements. If you don't coalesce the RDD you end up with an RDD with as many blocks as the original data and very few items.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---