[
https://issues.apache.org/jira/browse/FLINK-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704896#comment-14704896
]
ASF GitHub Bot commented on FLINK-2030:
---------------------------------------
Github user sachingoel0101 commented on a diff in the pull request:
https://github.com/apache/flink/pull/861#discussion_r37529224
--- Diff:
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala ---
@@ -119,4 +123,65 @@ object MLUtils {
stringRepresentation.writeAsText(filePath)
}
+
+ /** Create a [[ContinuousHistogram]] from the input data
+ *
+ * @param bins Number of bins required
+ * @param data input [[DataSet]] of [[Double]]
+ * @return [[ContinuousHistogram]] over the data
+ */
+ def createContinuousHistogram(data: DataSet[Double], bins: Int):
DataSet[ContinuousHistogram] = {
+ val min = data.reduce((x, y) => Math.min(x, y))
+ val max = data.reduce((x, y) => Math.max(x, y))
+
+ val stats = min.mapWithBcVariable(max) {
+ (minimum, maximum) => (minimum - 2 * (maximum - minimum), maximum +
2 * (maximum - minimum))
+ }
+
+ data.mapPartition(new RichMapPartitionFunction[Double,
ContinuousHistogram] {
+ var statistics: (Double, Double) = _
+
+ override def open(configuration: Configuration): Unit = {
+ statistics =
getRuntimeContext.getBroadcastVariable(HISTOGRAM_STATS).get(0)
+ val minimum = statistics._1
+ val maximum = statistics._2
+ statistics = (minimum - 2 * (maximum - minimum), maximum + 2 *
(maximum - minimum))
+ }
+
+ override def mapPartition(
+ values: java.lang.Iterable[Double],
+ out: Collector[ContinuousHistogram])
+ : Unit = {
+ val localHistogram = new ContinuousHistogram(bins, statistics._1,
statistics._2)
+ val iterator = values.iterator()
+ while (iterator.hasNext) {
+ localHistogram.add(iterator.next())
+ }
+ out.collect(localHistogram)
+ }
+ })
+ .withBroadcastSet(stats, HISTOGRAM_STATS)
+ .reduce((x, y) => x.merge(y, bins))
+ }
+
+ /** Create a [[DiscreteHistogram]] from the input data
+ *
+ * @param data input [[DataSet]] of [[Double]]
+ * @return [[DiscreteHistogram]] over the data
+ */
+ def createDiscreteHistogram(data: DataSet[Double]):
DataSet[DiscreteHistogram] = {
+ data.mapPartition(new RichMapPartitionFunction[Double,
DiscreteHistogram] {
--- End diff --
Yes, but that would require creating a Histogram object for every element.
This way we can work with just one object per partition.
> Implement an online histogram with Merging and equalization features
> --------------------------------------------------------------------
>
> Key: FLINK-2030
> URL: https://issues.apache.org/jira/browse/FLINK-2030
> Project: Flink
> Issue Type: Sub-task
> Components: Machine Learning Library
> Reporter: Sachin Goel
> Assignee: Sachin Goel
> Priority: Minor
> Labels: ML
>
> For the implementation of the decision tree in
> https://issues.apache.org/jira/browse/FLINK-1727, we need to implement an
> histogram with online updates, merging and equalization features. A reference
> implementation is provided in [1]
> [1].http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)