[ https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728571#comment-14728571 ]
ASF GitHub Bot commented on FLINK-2030: --------------------------------------- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r38616759 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/ContinuousHistogram.java --- @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.accumulators; + +import java.util.AbstractMap; +import java.util.Iterator; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeMap; + +import static java.lang.Double.MAX_VALUE; + +/** + * A Histogram accumulator designed for Continuous valued data. + * It supports: + * -- {@link #quantile(double)} + * Computes a quantile of the data + * -- {@link #count(double)} + * Computes number of items less than the given value in the data + * <p> + * A continuous histogram stores values in bins in sorted order and keeps their associated + * number of items. It is assumed that the items associated with every bin are scattered around + * it, half to the right and half to the left. + * <p> + * bin counters: m_1 m_2 m_3 m_4 m_5 m_6 + * 10 12 5 10 4 6 + * | 5 | 6 | 2.5 | 5 | 2 | + * 5 | + | + | + | + | + | 3 + * | 6 | 2.5 | 5 | 2 | 3 | + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + * bin index: 1 2 3 4 5 6 + * bin values: v_1 < v_2 < v_3 < v_4 < v_5 < v_6 + * <p> + * The number of items between v_i and v_(i+1) is directly proportional to the area of + * trapezoid (v_i, v_(i+1), m_(i+1), m_i) + * <p> + * Adapted from Ben-Haim and Yom-Tov's + * <a href = "http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf">Streaming Decision Tree Algorithm's histogram</a> + */ +public class ContinuousHistogram extends Histogram { + + private int bin; + private double lower; + private double upper; + private PriorityQueue<KeyDiff> diffQueue; + private HashMap<Double, KeyProps> keyUpdateTimes; + private long timestamp; + + /** + * Creates a new Continuous histogram with the given number of bins + * Bins represents the number of values the histogram stores to approximate the continuous + * data set. The higher this value, the more we move towards an exact representation of data. + * + * @param bin Number of bins in the histogram + */ + public ContinuousHistogram(int bin) { + if (bin <= 0) { + throw new IllegalArgumentException("Number of bins must be greater than zero"); + } + this.bin = bin; + lower = MAX_VALUE; + upper = -MAX_VALUE; + diffQueue = new PriorityQueue<>(); + keyUpdateTimes = new HashMap<>(); + timestamp = 0; + } + + @Override + public void resetLocal() { + super.resetLocal(); + this.lower = MAX_VALUE; + this.upper = -MAX_VALUE; + this.diffQueue.clear(); + this.keyUpdateTimes.clear(); + } + + /** + * Merges the given other histogram into this histogram, with the number of bins in the + * merged histogram being {@code numBins}. + * + * @param other Histogram to be merged + * @param numBins Bins in the merged histogram + */ + public void merge(Accumulator<Double, TreeMap<Double, Integer>> other, int numBins) { + bin = numBins; + super.merge(other); + } + + @Override + public String toString() { + return "Continuous " + super.toString(); + } + + @Override + public Accumulator<Double, TreeMap<Double, Integer>> clone() { + ContinuousHistogram result = new ContinuousHistogram(bin); + result.treeMap = new TreeMap<>(treeMap); + result.counter = counter; + result.lower = lower; + result.upper = upper; + // initialize all differences and key update times for the new histogram + result.computeDiffs(); + return result; + } + + @Override + void add(double value, int count) { + addValue(value, count); + if (getSize() > bin) { + mergeBins(); + } + } + + @Override + void fill(Set<Map.Entry<Double, Integer>> entries) { + for (Map.Entry<Double, Integer> entry : entries) { + if (entry.getValue() <= 0) { + throw new IllegalArgumentException("Negative counters are not allowed: " + entry); + } + } + + for (Map.Entry<Double, Integer> entry : entries) { + addValue(entry.getKey(), entry.getValue()); + } + + while (getSize() > bin) { + mergeBins(); + } + } + + /** + * Adds a new value to the histogram along with an associated count. + * + * @param value Value to be added + * @param count Associated count to this value + */ + private void addValue(double value, int count) { + if (value < lower) { + lower = value; + } + if (value > upper) { + upper = value; + } + + // Add to the map. + counter += count; + Integer current = treeMap.get(value); + Integer newValue = (current != null ? current : 0) + count; + this.treeMap.put(value, newValue); --- End diff -- Please remove `this` to increase uniformity. > Implement an online histogram with Merging and equalization features > -------------------------------------------------------------------- > > Key: FLINK-2030 > URL: https://issues.apache.org/jira/browse/FLINK-2030 > Project: Flink > Issue Type: Sub-task > Components: Machine Learning Library > Reporter: Sachin Goel > Assignee: Sachin Goel > Priority: Minor > Labels: ML > > For the implementation of the decision tree in > https://issues.apache.org/jira/browse/FLINK-1727, we need to implement an > histogram with online updates, merging and equalization features. A reference > implementation is provided in [1] > [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)