[
https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14730561#comment-14730561
]
ASF GitHub Bot commented on FLINK-2030:
---------------------------------------
Github user chiwanpark commented on a diff in the pull request:
https://github.com/apache/flink/pull/861#discussion_r38735611
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/accumulators/ContinuousHistogram.java
---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.accumulators;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static java.lang.Double.MAX_VALUE;
+
+/**
+ * A Histogram accumulator designed for Continuous valued data.
+ * It supports:
+ * -- {@link #quantile(double)}
+ * Computes a quantile of the data
+ * -- {@link #count(double)}
+ * Computes number of items less than the given value in the data
+ * <p>
+ * A continuous histogram stores values in bins in sorted order and keeps
their associated
+ * number of items. It is assumed that the items associated with every bin
are scattered around
+ * it, half to the right and half to the left.
+ * <p>
+ * bin counters: m_1 m_2 m_3 m_4 m_5 m_6
+ * 10 12 5 10 4 6
+ * | 5 | 6 | 2.5 | 5 | 2 |
+ * 5 | + | + | + | + | + | 3
+ * | 6 | 2.5 | 5 | 2 | 3 |
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * bin index: 1 2 3 4 5 6
+ * bin values: v_1 < v_2 < v_3 < v_4 < v_5 < v_6
+ * <p>
+ * The number of items between v_i and v_(i+1) is directly proportional to
the area of
+ * trapezoid (v_i, v_(i+1), m_(i+1), m_i)
+ * <p>
+ * Adapted from Ben-Haim and Yom-Tov's
+ * <a href =
"http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf">Streaming
Decision Tree Algorithm's histogram</a>
+ */
+public class ContinuousHistogram implements Accumulator<Double,
TreeMap<Double, Integer>> {
+
+ protected TreeMap<Double, Integer> treeMap = new TreeMap<Double,
Integer>();
+
+ protected long counter = 0;
+
+ private int bin;
+
+ private double lower;
+
+ private double upper;
+
+ private PriorityQueue<KeyDiff> diffQueue;
+
+ private HashMap<Double, KeyProps> keyUpdateTimes;
+
+ private long timestamp;
+
+ /**
+ * Creates a new Continuous histogram with the given number of bins
+ * Bins represents the number of values the histogram stores to
approximate the continuous
+ * data set. The higher this value, the more we move towards an exact
representation of data.
+ *
+ * @param numBins Number of bins in the histogram
+ */
+ public ContinuousHistogram(int numBins) {
+ if (numBins <= 0) {
+ throw new IllegalArgumentException("Number of bins must
be greater than zero");
+ }
+ bin = numBins;
+ lower = MAX_VALUE;
+ upper = -MAX_VALUE;
+ diffQueue = new PriorityQueue<>();
+ keyUpdateTimes = new HashMap<>();
+ timestamp = 0;
+ }
+
+ /**
+ * Consider using {@link #add(double)} for primitive double values to
get better performance.
+ */
+ @Override
+ public void add(Double value) {
+ add(value, 1);
+ }
+
+ public void add(double value) {
+ add(value, 1);
+ }
+
+ @Override
+ public TreeMap<Double, Integer> getLocalValue() {
+ return this.treeMap;
+ }
+
+ /**
+ * Get the total number of items added to this histogram.
+ * This is preserved across merge operations.
+ *
+ * @return Total number of items added to the histogram
+ */
+ public long getTotal() {
+ return counter;
+ }
+
+ /**
+ * Get the current size of the {@link #treeMap}
+ *
+ * @return Size of the {@link #treeMap}
+ */
+ public int getSize() {
+ return treeMap.size();
+ }
+
+ @Override
+ public void resetLocal() {
+ treeMap.clear();
+ counter = 0;
+ lower = MAX_VALUE;
+ upper = -MAX_VALUE;
+ diffQueue.clear();
+ keyUpdateTimes.clear();
+ }
+
+ @Override
+ public void merge(Accumulator<Double, TreeMap<Double, Integer>> other) {
+ fill(other.getLocalValue().entrySet());
+ }
+
+ /**
+ * Merges the given other histogram into this histogram, with the
number of bins in the
+ * merged histogram being {@code numBins}.
+ *
+ * @param other Histogram to be merged
+ * @param numBins Bins in the merged histogram
+ */
+ public void merge(Accumulator<Double, TreeMap<Double, Integer>> other,
int numBins) {
+ bin = numBins;
+ merge(other);
+ }
+
+ @Override
+ public Accumulator<Double, TreeMap<Double, Integer>> clone() {
+ ContinuousHistogram result = new ContinuousHistogram(bin);
+ result.treeMap = new TreeMap<>(treeMap);
+ result.counter = counter;
+ result.lower = lower;
+ result.upper = upper;
+ // initialize all differences and key update times for the new
histogram
+ result.computeDiffs();
+ return result;
+ }
+
+ void add(double value, int count) {
+ addValue(value, count);
+ if (getSize() > bin) {
+ mergeBins();
+ }
+ }
+
+ void fill(Set<Map.Entry<Double, Integer>> entries) {
--- End diff --
Same as above
> Implement an online histogram with Merging and equalization features
> --------------------------------------------------------------------
>
> Key: FLINK-2030
> URL: https://issues.apache.org/jira/browse/FLINK-2030
> Project: Flink
> Issue Type: Sub-task
> Components: Machine Learning Library
> Reporter: Sachin Goel
> Assignee: Sachin Goel
> Priority: Minor
> Labels: ML
>
> For the implementation of the decision tree in
> https://issues.apache.org/jira/browse/FLINK-1727, we need to implement an
> histogram with online updates, merging and equalization features. A reference
> implementation is provided in [1]
> [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)