Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96202871
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
 ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +/** An implementation of the Stochastic Outlier Selection algorithm by 
Jeroen Jansen
    +  *
    +  * For more information about SOS, see 
https://github.com/jeroenjanssens/sos
    +  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. 
Stochastic
    +  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg 
University,
    +  * Tilburg, the Netherlands, 2012.
    +  *
    +  * @example
    +  *          {{{
    +  *             val inputDS = env.fromCollection(List(
    +  *               LabeledVector(0.0, DenseVector(1.0, 1.0)),
    +  *               LabeledVector(1.0, DenseVector(2.0, 1.0)),
    +  *               LabeledVector(2.0, DenseVector(1.0, 2.0)),
    +  *               LabeledVector(3.0, DenseVector(2.0, 2.0)),
    +  *               LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
    +  *             ))
    +  *
    +  *             val sos = StochasticOutlierSelection()
    +  *               .setPerplexity(3)
    +  *
    +  *             val outputDS = sos.transform(inputDS)
    +  *
    +  *             val expectedOutputDS = Array(
    +  *                0.2790094479202896,
    +  *                0.25775014551682535,
    +  *                0.22136130977995766,
    +  *                0.12707053787018444,
    +  *                0.9922779902453757 // The outlier!
    +  *             )
    +  *
    +  *             assert(outputDS == expectedOutputDS)
    +  *          }}}
    +  *
    +  * =Parameters=
    +  *
    +  *  - 
[[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
    +  *  Perplexity can be interpreted as the k in k-nearest neighbor 
algorithms. The difference is that
    +  *  in SOS being a neighbor is not a binary property, but a probabilistic 
one. Should be between
    +  *  1 and n-1, where n is the number of observations.
    +  *  (Default value: '''30''')
    +  *
    +  *  - 
[[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
    +  *  The accepted error tolerance. When increasing this number, it will 
sacrifice accuracy in
    +  *  return for reduced computational time.
    +  *  (Default value: '''1e-20''')
    +  *
    +  *  - 
[[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
    +  *  The maximum number of iterations to perform. (Default value: 
'''5000''')
    +  */
    +
    +import breeze.linalg.functions.euclideanDistance
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => 
BreezeVector}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, 
WithParameters}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
    +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, 
Transformer}
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +class StochasticOutlierSelection extends 
Transformer[StochasticOutlierSelection] {
    +
    +  import StochasticOutlierSelection._
    +
    +
    +  /** Sets the perplexity of the outlier selection algorithm, can be seen 
as the k of kNN
    +    * For more information, please read the Stochastic Outlier Selection 
algorithm paper
    +    *
    +    * @param perplexity the perplexity of the affinity fit
    +    * @return
    +    */
    +  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
    +    require(perplexity >= 1, "Perplexity must be at least one.")
    +    parameters.add(Perplexity, perplexity)
    +    this
    +  }
    +
    +  /** The accepted error tolerance to save computational time when 
computing the affinity
    +    *
    +    * @param errorTolerance the accepted error tolerance with respect to 
the affinity
    +    * @return
    +    */
    +  def setErrorTolerance(errorTolerance: Double): 
StochasticOutlierSelection = {
    +    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
    +    parameters.add(ErrorTolerance, errorTolerance)
    +    this
    +  }
    +
    +  /** The maximum number of iterations to approximate the affinity
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @return
    +    */
    +  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
    +    require(maxIterations > 0, "Maximum iterations must be positive.")
    +    parameters.add(MaxIterations, maxIterations)
    +    this
    +  }
    +
    +}
    +
    +object StochasticOutlierSelection extends WithParameters {
    +
    +  // ========================================= Parameters 
==========================================
    +  case object Perplexity extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(30)
    +  }
    +
    +  case object ErrorTolerance extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(1e-20)
    +  }
    +
    +  case object MaxIterations extends Parameter[Int] {
    +    val defaultValue: Option[Int] = Some(5000)
    +  }
    +
    +  // ==================================== Factory methods 
==========================================
    +
    +  def apply(): StochasticOutlierSelection = {
    +    new StochasticOutlierSelection()
    +  }
    +
    +  // ===================================== Operations 
==============================================
    +  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
    +
    +  implicit val transformLabeledVectors = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, 
LabeledVector, (Int, Double)] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[LabeledVector]): 
DataSet[(Int, Double)] = {
    +
    +        val resultingParameters = instance.parameters ++ 
transformParameters
    +
    +        val vectorsWithIndex = input.map(labeledVector => {
    +          BreezeLabeledVector(labeledVector.label.toInt, 
labeledVector.vector.asBreeze)
    +        })
    +
    +        // Don't map back to a labeled-vector since the output of the 
algorithm is
    +        // a single double instead of vector
    +        outlierSelection(vectorsWithIndex, resultingParameters)
    +      }
    +    }
    +  }
    +
    +  /** [[TransformDataSetOperation]] applies the stochastic outlier 
selection algorithm on a
    +    * [[Vector]] which will transform the high-dimensionaly input to a 
single Double output.
    +    *
    +    * @tparam T Type of the input and output data which has to be a 
subtype of [[Vector]]
    +    * @return [[TransformDataSetOperation]] a single double which 
represents the oulierness of
    +    *         the input vectors, where the output is in [0, 1]
    +    */
    +  implicit def transformVectors[T <: Vector : BreezeVectorConverter : 
TypeInformation : ClassTag]
    +  = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, T, Double] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[T]): DataSet[Double] = {
    +
    +        val resultingParameters = instance.parameters ++ 
transformParameters
    +
    +        // Map to the right format
    +        val vectorsWithIndex = input.zipWithIndex.map(vector => {
    --- End diff --
    
    Does it strictly have to be the index you're zipping with? Would be a 
unique ID be enough? If so, then we could use the 
`DatatSetUtils.zipWithUniqueId` method which is considerably cheaper than 
`zipWithIndex`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to