[ https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705227#comment-14705227 ]
ASF GitHub Bot commented on FLINK-2030: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37551130 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala --- @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.Double.MaxValue +import scala.collection.mutable + +/** Implementation of a continuous valued online histogram + * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm + * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + * + * =Parameters= + * -[[capacity]]: + * Number of bins to be used in the histogram + * + * -[[min]]: + * Lower limit on the elements + * + * -[[max]]: + * Upper limit on the elements + */ +class ContinuousHistogram(capacity: Int, min: Double, max: Double) extends OnlineHistogram { + + private val lower = min + private val upper = max + + require(capacity > 0, "Capacity should be a positive integer") + require(lower < upper, "Lower must be less than upper") + + val data = new mutable.ArrayBuffer[(Double, Int)]() + + /** Adds a new item to the histogram + * + * @param p value to be added + */ + override def add(p: Double): Unit = { + require(p > lower && p < upper, p + " not in (" + lower + "," + upper + ")") + // search for the index where the value is just higher than p + val search = find(p) + // add the new value there, shifting everything to the right + data.insert(search, (p, 1)) + // If we're over capacity or any two elements are within 1e-9 of each other, merge. + // This will take care of the case if p was actually equal to some value in the histogram and + // just increment the value there + mergeElements() + } + + /** Merges the histogram with h and returns a histogram with capacity B + * + * @param h histogram to be merged + * @param B capacity of the resultant histogram + * @return Merged histogram with capacity B + */ + override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = { + h match { + case temp: ContinuousHistogram => { + val m: Int = bins + val n: Int = temp.bins + var i, j: Int = 0 + val mergeList = new mutable.ArrayBuffer[(Double, Int)]() + while (i < m || j < n) { + if (i >= m) { + mergeList += ((temp.getValue(j), temp.getCounter(j))) + j = j + 1 + } else if (j >= n || getValue(i) <= temp.getValue(j)) { + mergeList += data.apply(i) + i = i + 1 + } else { + mergeList += ((temp.getValue(j), temp.getCounter(j))) + j = j + 1 + } + } + // the size will be brought to capacity while constructing the new histogram itself + val finalLower = Math.min(lower, temp.lower) + val finalUpper = Math.max(upper, temp.upper) + val ret = new ContinuousHistogram(B, finalLower, finalUpper) + ret.loadData(mergeList.toArray) + ret + } + case default => + throw new RuntimeException("Only a continuous histogram is allowed to be merged with a " + + "continuous histogram") + + } + } + + /** Returns the qth quantile of the histogram + * + * @param q Quantile value in (0,1) + * @return Value at quantile q + */ + def quantile(q: Double): Double = { + require(bins > 0, "Histogram is empty") + require(q > 0 && q < 1, "Quantile must be between 0 and 1") + val wantedSum = (q * total).round.toInt + var currSum = count(getValue(0)) + + if (wantedSum < currSum) { + require(lower > -MaxValue, "Set a lower bound before proceeding") + return Math.sqrt(2 * wantedSum * Math.pow(getValue(0) - lower, 2) / getCounter(0)) + lower + } + + /** Walk the histogram to find sums at every bin value + * As soon as you hit the interval where you should be + * Walk along the trapezoidal line + * This leads to solving a quadratic equation. + */ + for (i <- 1 to bins - 1) { + val tmpSum = count(getValue(i)) + if (currSum <= wantedSum && wantedSum < tmpSum) { + val neededSum = wantedSum - currSum + val a: Double = getCounter(i) - getCounter(i - 1) + val b: Double = 2 * getCounter(i - 1) + val c: Double = -2 * neededSum --- End diff -- Could you elaborate a little bit on the used formula. I don't understand *walk along the trapezoidal line*. > Implement an online histogram with Merging and equalization features > -------------------------------------------------------------------- > > Key: FLINK-2030 > URL: https://issues.apache.org/jira/browse/FLINK-2030 > Project: Flink > Issue Type: Sub-task > Components: Machine Learning Library > Reporter: Sachin Goel > Assignee: Sachin Goel > Priority: Minor > Labels: ML > > For the implementation of the decision tree in > https://issues.apache.org/jira/browse/FLINK-1727, we need to implement an > histogram with online updates, merging and equalization features. A reference > implementation is provided in [1] > [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)