[
https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728571#comment-14728571
]
ASF GitHub Bot commented on FLINK-2030:
---------------------------------------
Github user chiwanpark commented on a diff in the pull request:
https://github.com/apache/flink/pull/861#discussion_r38616759
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/accumulators/ContinuousHistogram.java
---
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.accumulators;
+
+import java.util.AbstractMap;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static java.lang.Double.MAX_VALUE;
+
+/**
+ * A Histogram accumulator designed for Continuous valued data.
+ * It supports:
+ * -- {@link #quantile(double)}
+ * Computes a quantile of the data
+ * -- {@link #count(double)}
+ * Computes number of items less than the given value in the data
+ * <p>
+ * A continuous histogram stores values in bins in sorted order and keeps
their associated
+ * number of items. It is assumed that the items associated with every bin
are scattered around
+ * it, half to the right and half to the left.
+ * <p>
+ * bin counters: m_1 m_2 m_3 m_4 m_5 m_6
+ * 10 12 5 10 4 6
+ * | 5 | 6 | 2.5 | 5 | 2 |
+ * 5 | + | + | + | + | + | 3
+ * | 6 | 2.5 | 5 | 2 | 3 |
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * bin index: 1 2 3 4 5 6
+ * bin values: v_1 < v_2 < v_3 < v_4 < v_5 < v_6
+ * <p>
+ * The number of items between v_i and v_(i+1) is directly proportional to
the area of
+ * trapezoid (v_i, v_(i+1), m_(i+1), m_i)
+ * <p>
+ * Adapted from Ben-Haim and Yom-Tov's
+ * <a href =
"http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf">Streaming
Decision Tree Algorithm's histogram</a>
+ */
+public class ContinuousHistogram extends Histogram {
+
+ private int bin;
+ private double lower;
+ private double upper;
+ private PriorityQueue<KeyDiff> diffQueue;
+ private HashMap<Double, KeyProps> keyUpdateTimes;
+ private long timestamp;
+
+ /**
+ * Creates a new Continuous histogram with the given number of bins
+ * Bins represents the number of values the histogram stores to
approximate the continuous
+ * data set. The higher this value, the more we move towards an exact
representation of data.
+ *
+ * @param bin Number of bins in the histogram
+ */
+ public ContinuousHistogram(int bin) {
+ if (bin <= 0) {
+ throw new IllegalArgumentException("Number of bins must
be greater than zero");
+ }
+ this.bin = bin;
+ lower = MAX_VALUE;
+ upper = -MAX_VALUE;
+ diffQueue = new PriorityQueue<>();
+ keyUpdateTimes = new HashMap<>();
+ timestamp = 0;
+ }
+
+ @Override
+ public void resetLocal() {
+ super.resetLocal();
+ this.lower = MAX_VALUE;
+ this.upper = -MAX_VALUE;
+ this.diffQueue.clear();
+ this.keyUpdateTimes.clear();
+ }
+
+ /**
+ * Merges the given other histogram into this histogram, with the
number of bins in the
+ * merged histogram being {@code numBins}.
+ *
+ * @param other Histogram to be merged
+ * @param numBins Bins in the merged histogram
+ */
+ public void merge(Accumulator<Double, TreeMap<Double, Integer>> other,
int numBins) {
+ bin = numBins;
+ super.merge(other);
+ }
+
+ @Override
+ public String toString() {
+ return "Continuous " + super.toString();
+ }
+
+ @Override
+ public Accumulator<Double, TreeMap<Double, Integer>> clone() {
+ ContinuousHistogram result = new ContinuousHistogram(bin);
+ result.treeMap = new TreeMap<>(treeMap);
+ result.counter = counter;
+ result.lower = lower;
+ result.upper = upper;
+ // initialize all differences and key update times for the new
histogram
+ result.computeDiffs();
+ return result;
+ }
+
+ @Override
+ void add(double value, int count) {
+ addValue(value, count);
+ if (getSize() > bin) {
+ mergeBins();
+ }
+ }
+
+ @Override
+ void fill(Set<Map.Entry<Double, Integer>> entries) {
+ for (Map.Entry<Double, Integer> entry : entries) {
+ if (entry.getValue() <= 0) {
+ throw new IllegalArgumentException("Negative
counters are not allowed: " + entry);
+ }
+ }
+
+ for (Map.Entry<Double, Integer> entry : entries) {
+ addValue(entry.getKey(), entry.getValue());
+ }
+
+ while (getSize() > bin) {
+ mergeBins();
+ }
+ }
+
+ /**
+ * Adds a new value to the histogram along with an associated count.
+ *
+ * @param value Value to be added
+ * @param count Associated count to this value
+ */
+ private void addValue(double value, int count) {
+ if (value < lower) {
+ lower = value;
+ }
+ if (value > upper) {
+ upper = value;
+ }
+
+ // Add to the map.
+ counter += count;
+ Integer current = treeMap.get(value);
+ Integer newValue = (current != null ? current : 0) + count;
+ this.treeMap.put(value, newValue);
--- End diff --
Please remove `this` to increase uniformity.
> Implement an online histogram with Merging and equalization features
> --------------------------------------------------------------------
>
> Key: FLINK-2030
> URL: https://issues.apache.org/jira/browse/FLINK-2030
> Project: Flink
> Issue Type: Sub-task
> Components: Machine Learning Library
> Reporter: Sachin Goel
> Assignee: Sachin Goel
> Priority: Minor
> Labels: ML
>
> For the implementation of the decision tree in
> https://issues.apache.org/jira/browse/FLINK-1727, we need to implement an
> histogram with online updates, merging and equalization features. A reference
> implementation is provided in [1]
> [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)