[
https://issues.apache.org/jira/browse/FLINK-3128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15130238#comment-15130238
]
ASF GitHub Bot commented on FLINK-3128:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1565#discussion_r51707141
--- Diff:
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/IsotonicRegression.scala
---
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.regression
+
+import java.util.Arrays.binarySearch
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation,
Predictor}
+
+import scala.collection.mutable.ArrayBuffer
+
+case class IsotonicRegressionModel(boundaries: Array[Double], predictions:
Array[Double])
+
+/**
+ * Isotonic regression.
+ * Currently implemented using parallelized pool adjacent violators
algorithm.
+ * Only univariate (single feature) algorithm supported.
+ *
+ * Sequential PAV implementation based on:
+ * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
+ * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
+ * Available from [[http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf]]
+ *
+ * Sequential PAV parallelization based on:
+ * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
+ * "An approach to parallelizing isotonic regression."
+ * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996.
141-147.
+ * Available from
[[http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf]]
+ *
+ * @see [[http://en.wikipedia.org/wiki/Isotonic_regression Isotonic
regression (Wikipedia)]]
+ *
+ * This is a port from the implementation in Apache Spark.
+ * @example
+ * {{{
+ * val ir = IsotonicRegression()
+ * .setIsotonic(true)
+ *
+ * val trainingDS: DataSet[(Double,Double,Double)] = ...
+ * val testingDS: DataSet[(Double)] = ...
+ *
+ * mlr.fit(trainingDS)
+ *
+ * val predictions = mlr.predict(testingDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.regression.IsotonicRegression.Isotonic]]:
+ * true if labels shall be ascending, false if labels shall be descending.
+ *
+ */
+class IsotonicRegression extends Predictor[IsotonicRegression] {
+
+ var isotonic = true
+
+ var model: Option[DataSet[IsotonicRegressionModel]] = None
+
+ def setIsotonic(isotonic: Boolean): this.type = {
+ this.isotonic = isotonic
+ this
+ }
+
+}
+
+object IsotonicRegression {
+
+ // ====================================== Parameters
===========================================
+
+ case object Isotonic extends Parameter[Boolean] {
+ val defaultValue = Some(true)
+ }
+
+ // ======================================== Factory methods
====================================
+
+ def apply(): IsotonicRegression = {
+ new IsotonicRegression()
+ }
+
+ // ====================================== Operations
===========================================
+
+ class AdjacentPoolViolatersMapper extends MapFunction[Array[(Double,
Double, Double)], Array[
+ (Double, Double, Double)]] {
+
+ /**
+ * Performs a pool adjacent violators algorithm (PAV).
+ * Uses approach with single processing of data where violators
+ * in previously processed data created by pooling are fixed
immediately.
+ * Uses optimization of discovering monotonicity violating sequences
(blocks).
+ *
+ * @param input Input data of tuples (label, feature, weight).
+ * @return Result tuples (label, feature, weight) where labels were
updated
+ * to form a monotone sequence as per isotonic regression
definition.
+ */
+ override def map(input: Array[(Double, Double, Double)]):
+ Array[(Double, Double, Double)] = {
+ if (input.isEmpty) {
+ return Array.empty
+ }
+
+ // Pools sub array within given bounds assigning weighted average
value to all
+ // elements.
+ def pool(input: Array[(Double, Double, Double)], start: Int, end:
Int): Unit = {
+ val poolSubArray = input.slice(start, end + 1)
+
+ val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
+ val weight = poolSubArray.map(_._3).sum
+
+ var i = start
+ while (i <= end) {
+ input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
+ i = i + 1
+ }
+ }
+
+ var i = 0
+ val len = input.length
+ while (i < len) {
+ var j = i
+
+ // Find monotonicity violating sequence, if any.
+ while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
+ j = j + 1
+ }
+
+ // If monotonicity was not violated, move to next data point.
+ if (i == j) {
+ i = i + 1
+ } else {
+ // Otherwise pool the violating sequence
+ // and check if pooling caused monotonicity violation in
previously processed
+ // points.
+ while (i >= 0 && input(i)._1 > input(i + 1)._1) {
+ pool(input, i, j)
+ i = i - 1
+ }
+
+ i = j
+ }
+ }
+ // For points having the same prediction, we only keep two boundary
points.
+ val compressed = ArrayBuffer.empty[(Double, Double, Double)]
+
+ var (curLabel, curFeature, curWeight) = input.head
+ var rightBound = curFeature
+ def merge(): Unit = {
+ compressed += ((curLabel, curFeature, curWeight))
+ if (rightBound > curFeature) {
+ compressed += ((curLabel, rightBound, 0.0))
+ }
+ }
+ i = 1
+ while (i < input.length) {
+ val (label, feature, weight) = input(i)
+ if (label == curLabel) {
+ curWeight += weight
+ rightBound = feature
+ } else {
+ merge()
+ curLabel = label
+ curFeature = feature
+ curWeight = weight
+ rightBound = curFeature
+ }
+ i += 1
+ }
+ merge()
+
+ compressed.toArray
+ }
+ }
+
+
+ implicit val fitIR = new FitOperation[IsotonicRegression, (Double,
Double, Double)] {
+
+ override def fit(instance: IsotonicRegression,
+ fitParameters: ParameterMap,
+ input: DataSet[(Double, Double, Double)]): Unit = {
+
+ val isotonic = instance.isotonic
+
+ val preprocessedInput = if (isotonic) {
+ input
+ } else {
+ input.map(x => (-x._1, x._2, x._3))
+ }
+
+ val parallelStepResult = preprocessedInput
+ .partitionByRange(1)
+ .mapPartition(partition => {
+ val buffer = new ArrayBuffer[(Double, Double, Double)]
+ buffer ++= partition
+ Seq(buffer.sortBy(x => (x._2, x._1)).toArray)
--- End diff --
This operation effectively means that you collect all the input data on one
machine and materialize it. This won't scale at all.
> Add Isotonic Regression To ML Library
> -------------------------------------
>
> Key: FLINK-3128
> URL: https://issues.apache.org/jira/browse/FLINK-3128
> Project: Flink
> Issue Type: New Feature
> Components: Machine Learning Library
> Reporter: Fridtjof Sander
> Assignee: Fridtjof Sander
> Priority: Minor
>
> Isotonic Regression fits a monotonically increasing function (also called
> isotonic function) to a plane of datapoints.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)