[ https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704870#comment-14704870 ]
ASF GitHub Bot commented on FLINK-2030: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37527987 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala --- @@ -119,4 +123,65 @@ object MLUtils { stringRepresentation.writeAsText(filePath) } + + /** Create a [[ContinuousHistogram]] from the input data + * + * @param bins Number of bins required + * @param data input [[DataSet]] of [[Double]] + * @return [[ContinuousHistogram]] over the data + */ + def createContinuousHistogram(data: DataSet[Double], bins: Int): DataSet[ContinuousHistogram] = { + val min = data.reduce((x, y) => Math.min(x, y)) + val max = data.reduce((x, y) => Math.max(x, y)) + + val stats = min.mapWithBcVariable(max) { + (minimum, maximum) => (minimum - 2 * (maximum - minimum), maximum + 2 * (maximum - minimum)) + } + + data.mapPartition(new RichMapPartitionFunction[Double, ContinuousHistogram] { + var statistics: (Double, Double) = _ + + override def open(configuration: Configuration): Unit = { + statistics = getRuntimeContext.getBroadcastVariable(HISTOGRAM_STATS).get(0) + val minimum = statistics._1 + val maximum = statistics._2 + statistics = (minimum - 2 * (maximum - minimum), maximum + 2 * (maximum - minimum)) + } + + override def mapPartition( + values: java.lang.Iterable[Double], + out: Collector[ContinuousHistogram]) + : Unit = { + val localHistogram = new ContinuousHistogram(bins, statistics._1, statistics._2) + val iterator = values.iterator() + while (iterator.hasNext) { + localHistogram.add(iterator.next()) + } + out.collect(localHistogram) + } + }) + .withBroadcastSet(stats, HISTOGRAM_STATS) + .reduce((x, y) => x.merge(y, bins)) --- End diff -- same here > Implement an online histogram with Merging and equalization features > -------------------------------------------------------------------- > > Key: FLINK-2030 > URL: https://issues.apache.org/jira/browse/FLINK-2030 > Project: Flink > Issue Type: Sub-task > Components: Machine Learning Library > Reporter: Sachin Goel > Assignee: Sachin Goel > Priority: Minor > Labels: ML > > For the implementation of the decision tree in > https://issues.apache.org/jira/browse/FLINK-1727, we need to implement an > histogram with online updates, merging and equalization features. A reference > implementation is provided in [1] > [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)