http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala
deleted file mode 100644
index e49b3a3..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Transformer.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.{ParameterMap, WithParameters}
-
-trait Transformer[Self <: Transformer[Self]]
-  extends Estimator[Self]
-  with WithParameters
-  with Serializable {
-  that: Self =>
-
-  def transform[I, O](input: DataSet[I], transformParameters: ParameterMap = 
ParameterMap.Empty)
-                     (implicit transformOperation: TransformOperation[Self, I, 
O]): DataSet[O] = {
-    transformOperation.transform(that, transformParameters, input)
-  }
-
-  def chainTransformer[T <: Transformer[T]](transformer: T): 
ChainedTransformer[Self, T] = {
-    ChainedTransformer(this, transformer)
-  }
-
-  def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, 
P] = {
-    ChainedPredictor(this, predictor)
-  }
-}
-
-object Transformer{
-  implicit def fallbackChainedTransformOperation[
-      L <: Transformer[L],
-      R <: Transformer[R],
-      LI,
-      LO,
-      RI,
-      RO]
-      (implicit transformLeft: TransformOperation[L, LI, LO],
-      transformRight: TransformOperation[R, RI, RO])
-    : TransformOperation[ChainedTransformer[L,R], LI, RO] = {
-
-    new TransformOperation[ChainedTransformer[L, R], LI, RO] {
-      override def transform(
-          chain: ChainedTransformer[L, R],
-          transformParameters: ParameterMap,
-          input: DataSet[LI]): DataSet[RO] = {
-        transformLeft.transform(chain.left, transformParameters, input)
-        transformRight.transform(chain.right, transformParameters, null)
-      }
-    }
-  }
-
-  implicit def fallbackTransformOperation[
-      Self: ClassTag,
-      IN: ClassTag,
-      OUT: ClassTag]
-    : TransformOperation[Self, IN, OUT] = {
-    new TransformOperation[Self, IN, OUT] {
-      override def transform(
-          instance: Self,
-          transformParameters: ParameterMap,
-          input: DataSet[IN])
-        : DataSet[OUT] = {
-        val self = implicitly[ClassTag[Self]]
-        val in = implicitly[ClassTag[IN]]
-        val out = implicitly[ClassTag[OUT]]
-
-        throw new RuntimeException("There is no TransformOperation defined for 
" +
-          self.runtimeClass +  " which takes a DataSet[" + in.runtimeClass +
-          "] as input and transforms it into a DataSet[" + out.runtimeClass + 
"]")
-      }
-    }
-  }
-}
-
-abstract class TransformOperation[Self, IN, OUT] extends Serializable{
-  def transform(instance: Self, transformParameters: ParameterMap, input: 
DataSet[IN]): DataSet[OUT]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
deleted file mode 100644
index 61f477c..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.feature
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer, 
LabeledVector}
-import org.apache.flink.ml.feature.PolynomialBase.Degree
-import org.apache.flink.ml.math.{DenseVector, Vector}
-
-import org.apache.flink.api.scala._
-
-/** Maps a vector into the polynomial feature space.
-  *
-  * This transformer takes a a vector of values `(x, y, z, ...)` and maps it 
into the
-  * polynomial feature space of degree `d`. That is to say, it calculates the 
following
-  * representation:
-  *
-  * `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T`
-  *
-  * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
-  * [[LabeledVector]].
-  *
-  * @example
-  *          {{{
-  *             val trainingDS: DataSet[LabeledVector] = ...
-  *
-  *             val polyBase = PolynomialBase()
-  *               .setDegree(3)
-  *
-  *             val mlr = MultipleLinearRegression()
-  *
-  *             val chained = polyBase.chain(mlr)
-  *
-  *             val model = chained.fit(trainingDS)
-  *          }}}
-  *
-  * =Parameters=
-  *
-  *  - [[PolynomialBase.Degree]]: Maximum polynomial degree
-  */
-class PolynomialBase extends Transformer[LabeledVector, LabeledVector] with 
Serializable {
-
-  def setDegree(degree: Int): PolynomialBase = {
-    parameters.add(Degree, degree)
-    this
-  }
-
-  override def transform(input: DataSet[LabeledVector], parameters: 
ParameterMap):
-  DataSet[LabeledVector] = {
-    val resultingParameters = this.parameters ++ parameters
-
-    val degree = resultingParameters(Degree)
-
-    input.map {
-      labeledVector => {
-        val vector = labeledVector.vector
-        val label = labeledVector.label
-
-        val transformedVector = calculatePolynomial(degree, vector)
-
-        LabeledVector(label, transformedVector)
-      }
-    }
-  }
-
-  private def calculatePolynomial(degree: Int, vector: Vector): Vector = {
-    new DenseVector(calculateCombinedCombinations(degree, vector).toArray)
-  }
-
-  /** Calculates for a given vector its representation in the polynomial 
feature space.
-    *
-    * @param degree Maximum degree of polynomial
-    * @param vector Values of the polynomial variables
-    * @return List of polynomial values
-    */
-  private def calculateCombinedCombinations(degree: Int, vector: Vector): 
List[Double] = {
-    if(degree == 0) {
-      List()
-    } else {
-      val partialResult = calculateCombinedCombinations(degree - 1, vector)
-
-      val combinations = calculateCombinations(vector.size, degree)
-
-      val result = combinations map {
-        combination =>
-          combination.zipWithIndex.map{
-            case (exp, idx) => math.pow(vector(idx), exp)
-          }.fold(1.0)(_ * _)
-      }
-
-      result ::: partialResult
-    }
-
-  }
-
-  /** Calculates all possible combinations of a polynom of degree `value`, 
whereas the polynom
-    * can consist of up to `length` factors. The return value is the list of 
the exponents of the
-    * individual factors
-    *
-    * @param length maximum number of factors
-    * @param value degree of polynomial
-    * @return List of lists which contain the exponents of the individual 
factors
-    */
-  private def calculateCombinations(length: Int, value: Int): List[List[Int]] 
= {
-    if(length == 0) {
-      List()
-    } else if (length == 1) {
-      List(List(value))
-    } else {
-      value to 0 by -1 flatMap {
-        v =>
-          calculateCombinations(length - 1, value - v) map {
-            v::_
-          }
-      } toList
-    }
-  }
-}
-
-object PolynomialBase{
-
-  case object Degree extends Parameter[Int] {
-    override val defaultValue: Option[Int] = Some(1)
-  }
-
-  // ========================= Factory methods 
======================================
-
-  def apply(): PolynomialBase = {
-    new PolynomialBase()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
index dffb984..fbe35d4 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
@@ -68,13 +68,9 @@ object Breeze {
   }
 
   implicit class Breeze2VectorConverter(vector: BreezeVector[Double]) {
-    def fromBreeze: Vector = {
-      vector match {
-        case dense: BreezeDenseVector[Double] => new DenseVector(dense.data)
-
-        case sparse: BreezeSparseVector[Double] =>
-          new SparseVector(sparse.length, sparse.index, sparse.data)
-      }
+    def fromBreeze[T <: Vector: BreezeVectorConverter]: T = {
+      val converter = implicitly[BreezeVectorConverter[T]]
+      converter.convert(vector)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
new file mode 100644
index 0000000..687772e
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math
+
+import breeze.linalg.{SparseVector => BreezeSparseVector}
+import breeze.linalg.{DenseVector => BreezeDenseVector}
+import breeze.linalg.{Vector => BreezeVector}
+
+/** Type class which allows the conversion from Breeze vectors to Flink vectors
+  *
+  * @tparam T Resulting type of the conversion
+  */
+trait BreezeVectorConverter[T <: Vector] extends Serializable {
+  /** Converts a Breeze vector into a Flink vector of type T
+    *
+    * @param vector Breeze vector
+    * @return Flink vector of type T
+    */
+  def convert(vector: BreezeVector[Double]): T
+}
+
+object BreezeVectorConverter{
+
+  /** Type class implementation for [[org.apache.flink.ml.math.DenseVector]] */
+  implicit val denseVectorConverter = new BreezeVectorConverter[DenseVector] {
+    override def convert(vector: BreezeVector[Double]): DenseVector = {
+      vector match {
+        case dense: BreezeDenseVector[Double] => new DenseVector(dense.data)
+        case sparse: BreezeSparseVector[Double] => new 
DenseVector(sparse.toDenseVector.data)
+      }
+    }
+  }
+
+  /** Type class implementation for [[org.apache.flink.ml.math.SparseVector]] 
*/
+  implicit val sparseVectorConverter = new BreezeVectorConverter[SparseVector] 
{
+    override def convert(vector: BreezeVector[Double]): SparseVector = {
+      vector match {
+        case dense: BreezeDenseVector[Double] =>
+          SparseVector.fromCOO(
+            dense.length,
+            dense.iterator.toIterable)
+        case sparse: BreezeSparseVector[Double] =>
+          new SparseVector(sparse.length, sparse.index, sparse.data)
+      }
+    }
+  }
+
+  /** Type class implementation for [[Vector]] */
+  implicit val vectorConverter = new BreezeVectorConverter[Vector] {
+    override def convert(vector: BreezeVector[Double]): Vector = {
+      vector match {
+        case dense: BreezeDenseVector[Double] => new DenseVector(dense.data)
+
+        case sparse: BreezeSparseVector[Double] =>
+          new SparseVector(sparse.length, sparse.index, sparse.data)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala
deleted file mode 100644
index b73b249..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/CanCopy.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-trait CanCopy[T] extends Serializable {
-  def copy(value: T): T
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
index fa34ae1..079e4bc 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
@@ -134,8 +134,4 @@ object DenseVector {
   def init(size: Int, value: Double): DenseVector = {
     new DenseVector(Array.fill(size)(value))
   }
-
-  implicit val canCopy = new CanCopy[DenseVector]{
-    override def copy(value: DenseVector): DenseVector = value.copy
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
index 739fb9c..0b1f0cd 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
@@ -72,9 +72,3 @@ trait Vector extends Serializable {
     }
   }
 }
-
-object Vector{
-  implicit val canCopy = new CanCopy[Vector] {
-    override def copy(value: Vector): Vector = value.copy
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
new file mode 100644
index 0000000..79c7005
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math
+
+/** Type class to allow the vector construction from different data types
+  *
+  * @tparam T Subtype of [[Vector]]
+  */
+trait VectorBuilder[T <: Vector] extends Serializable {
+  /** Builds a [[Vector]] of type T from a List[Double]
+    *
+    * @param data Input data where the index denotes the resulting index of 
the vector
+    * @return A vector of type T
+    */
+  def build(data: List[Double]): T
+}
+
+object VectorBuilder{
+
+  /** Type class implementation for [[org.apache.flink.ml.math.DenseVector]] */
+  implicit val denseVectorBuilder = new VectorBuilder[DenseVector] {
+    override def build(data: List[Double]): DenseVector = {
+      new DenseVector(data.toArray)
+    }
+  }
+
+  /** Type class implementation for [[org.apache.flink.ml.math.SparseVector]] 
*/
+  implicit val sparseVectorBuilder = new VectorBuilder[SparseVector] {
+    override def build(data: List[Double]): SparseVector = {
+      // Enrich elements with explicit indices and filter out zero entries
+      SparseVector.fromCOO(data.length, (0 until 
data.length).zip(data).filter(_._2 != 0.0))
+    }
+  }
+
+  /** Type class implementation for [[Vector]] */
+  implicit val vectorBuilder = new VectorBuilder[Vector] {
+    override def build(data: List[Double]): Vector = {
+      new DenseVector(data.toArray)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
index e0f43d6..4c7f254 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
@@ -107,6 +107,4 @@ package object math {
 
     }
   }
-
-  def copy[T](value: T)(implicit canCopy: CanCopy[T]): T = canCopy.copy(value)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
new file mode 100644
index 0000000..85a5b9e
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.pipeline
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.ml.common.ParameterMap
+
+/** [[Predictor]] which represents a pipeline of possibly multiple 
[[Transformer]] and a trailing
+  * [[Predictor]].
+  *
+  * The [[ChainedPredictor]] can be used as a regular [[Predictor]]. Upon 
calling the fit method,
+  * the input data is piped through all preceding [[Transformer]] in the 
pipeline and the resulting
+  * data is given to the trailing [[Predictor]]. The same holds true for the 
predict operation.
+  *
+  * The pipeline mechanism has been inspired by scikit-learn
+  *
+  * @param transformer Preceding [[Transformer]] of the pipeline
+  * @param predictor Trailing [[Predictor]] of the pipeline
+  * @tparam T Type of the preceding [[Transformer]]
+  * @tparam P Type of the trailing [[Predictor]]
+  */
+case class ChainedPredictor[T <: Transformer[T], P <: 
Predictor[P]](transformer: T, predictor: P)
+  extends Predictor[ChainedPredictor[T, P]]{}
+
+object ChainedPredictor{
+
+  /** [[PredictOperation]] for the [[ChainedPredictor]].
+    *
+    * The [[PredictOperation]] requires the [[TransformOperation]] of the 
preceding [[Transformer]]
+    * and the [[PredictOperation]] of the trailing [[Predictor]]. Upon calling 
predict, the testing
+    * data is first transformed by the preceding [[Transformer]] and the 
result is then used to
+    * calculate the prediction via the trailing [[Predictor]].
+    *
+    * @param transformOperation [[TransformOperation]] for the preceding 
[[Transformer]]
+    * @param predictOperation [[PredictOperation]] for the trailing 
[[Predictor]]
+    * @tparam T Type of the preceding [[Transformer]]
+    * @tparam P Type of the trailing [[Predictor]]
+    * @tparam Testing Type of the testing data
+    * @tparam Intermediate Type of the intermediate data produced by the 
preceding [[Transformer]]
+    * @tparam Prediction Type of the predicted data generated by the trailing 
[[Predictor]]
+    * @return
+    */
+  implicit def chainedPredictOperation[
+      T <: Transformer[T],
+      P <: Predictor[P],
+      Testing,
+      Intermediate,
+      Prediction](
+      implicit transformOperation: TransformOperation[T, Testing, 
Intermediate],
+      predictOperation: PredictOperation[P, Intermediate, Prediction])
+    : PredictOperation[ChainedPredictor[T, P], Testing, Prediction] = {
+
+    new PredictOperation[ChainedPredictor[T, P], Testing, Prediction] {
+      override def predict(
+          instance: ChainedPredictor[T, P],
+          predictParameters: ParameterMap,
+          input: DataSet[Testing])
+        : DataSet[Prediction] = {
+
+        val testing = instance.transformer.transform(input, predictParameters)
+        instance.predictor.predict(testing, predictParameters)
+      }
+    }
+  }
+
+  /** [[FitOperation]] for the [[ChainedPredictor]].
+    *
+    * The [[FitOperation]] requires the [[FitOperation]] and the 
[[TransformOperation]] of the
+    * preceding [[Transformer]] as well as the [[FitOperation]] of the 
trailing [[Predictor]].
+    * Upon calling fit, the preceding [[Transformer]] is first fitted to the 
training data.
+    * The training data is then transformed by the fitted [[Transformer]]. The 
transformed data
+    * is then used to fit the [[Predictor]].
+    *
+    * @param fitOperation [[FitOperation]] of the preceding [[Transformer]]
+    * @param transformOperation [[TransformOperation]] of the preceding 
[[Transformer]]
+    * @param predictorFitOperation [[PredictOperation]] of the trailing 
[[Predictor]]
+    * @tparam L Type of the preceding [[Transformer]]
+    * @tparam R Type of the trailing [[Predictor]]
+    * @tparam I Type of the training data
+    * @tparam T Type of the intermediate data
+    * @return
+    */
+  implicit def chainedFitOperation[L <: Transformer[L], R <: Predictor[R], I, 
T](implicit
+    fitOperation: FitOperation[L, I],
+    transformOperation: TransformOperation[L, I, T],
+    predictorFitOperation: FitOperation[R, T]): 
FitOperation[ChainedPredictor[L, R], I] = {
+    new FitOperation[ChainedPredictor[L, R], I] {
+      override def fit(
+          instance: ChainedPredictor[L, R],
+          fitParameters: ParameterMap,
+          input: DataSet[I])
+        : Unit = {
+        instance.transformer.fit(input, fitParameters)
+        val intermediateResult = instance.transformer.transform(input, 
fitParameters)
+        instance.predictor.fit(intermediateResult, fitParameters)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
new file mode 100644
index 0000000..e443b80
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.pipeline
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.ml.common.ParameterMap
+
+/** [[Transformer]] which represents the chaining of two [[Transformer]].
+  *
+  * A [[ChainedTransformer]] can be treated as regular [[Transformer]]. Upon 
calling the fit or
+  * transform operation, the data is piped through all [[Transformer]] of the 
pipeline.
+  *
+  * The pipeline mechanism has been inspired by scikit-learn
+  *
+  * @param left Left [[Transformer]] of the pipeline
+  * @param right Right [[Transformer]] of the pipeline
+  * @tparam L Type of the left [[Transformer]]
+  * @tparam R Type of the right [[Transformer]]
+  */
+case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](left: 
L, right: R)
+  extends Transformer[ChainedTransformer[L, R]] {
+}
+
+object ChainedTransformer{
+
+  /** [[TransformOperation]] implementation for [[ChainedTransformer]].
+    *
+    * First the transform operation of the left [[Transformer]] is called with 
the input data. This
+    * generates intermediate data which is fed to the right [[Transformer]]'s 
transform operation.
+    *
+    * @param transformOpLeft [[TransformOperation]] for the left 
[[Transformer]]
+    * @param transformOpRight [[TransformOperation]] for the right 
[[Transformer]]
+    * @tparam L Type of the left [[Transformer]]
+    * @tparam R Type of the right [[Transformer]]
+    * @tparam I Type of the input data
+    * @tparam T Type of the intermediate output data
+    * @tparam O Type of the output data
+    * @return
+    */
+  implicit def chainedTransformOperation[
+      L <: Transformer[L],
+      R <: Transformer[R],
+      I,
+      T,
+      O](implicit
+      transformOpLeft: TransformOperation[L, I, T],
+      transformOpRight: TransformOperation[R, T, O])
+    : TransformOperation[ChainedTransformer[L,R], I, O] = {
+
+    new TransformOperation[ChainedTransformer[L, R], I, O] {
+      override def transform(
+          chain: ChainedTransformer[L, R],
+          transformParameters: ParameterMap,
+          input: DataSet[I]): DataSet[O] = {
+        val intermediateResult = transformOpLeft.transform(chain.left, 
transformParameters, input)
+        transformOpRight.transform(chain.right, transformParameters, 
intermediateResult)
+      }
+    }
+  }
+
+  /** [[FitOperation]] implementation for [[ChainedTransformer]].
+    *
+    * First the fit operation of the left [[Transformer]] is called with the 
input data. Then
+    * the data is transformed by this [[Transformer]] and the given to the fit 
operation of the
+    * right [[Transformer]].
+    *
+    * @param leftFitOperation [[FitOperation]] for the left [[Transformer]]
+    * @param leftTransformOperation [[TransformOperation]] for the left 
[[Transformer]]
+    * @param rightFitOperation [[FitOperation]] for the right [[Transformer]]
+    * @tparam L Type of the left [[Transformer]]
+    * @tparam R Type of the right [[Transformer]]
+    * @tparam I Type of the input data
+    * @tparam T Type of the intermediate output data
+    * @return
+    */
+  implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], 
I, T](implicit
+      leftFitOperation: FitOperation[L, I],
+      leftTransformOperation: TransformOperation[L, I, T],
+      rightFitOperation: FitOperation[R, T]): 
FitOperation[ChainedTransformer[L, R], I] = {
+    new FitOperation[ChainedTransformer[L, R], I] {
+      override def fit(
+          instance: ChainedTransformer[L, R],
+          fitParameters: ParameterMap,
+          input: DataSet[I]): Unit = {
+        instance.left.fit(input, fitParameters)
+        val intermediateResult = instance.left.transform(input, fitParameters)
+        instance.right.fit(intermediateResult, fitParameters)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
new file mode 100644
index 0000000..6acac8f
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.pipeline
+
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.ml.common.{ParameterMap, WithParameters}
+
+/** Base trait for Flink's pipeline operators.
+  *
+  * An estimator can be fitted to input data. In order to do that the 
implementing class has
+  * to provide an implementation of a [[FitOperation]] with the correct input 
type. In order to make
+  * the [[FitOperation]] retrievable by the Scala compiler, the implementation 
should be placed
+  * in the companion object of the implementing class.
+  *
+  * The pipeline mechanism has been inspired by scikit-learn
+  *
+  * @tparam Self
+  */
+trait Estimator[Self] extends WithParameters with Serializable {
+  that: Self =>
+
+  /** Fits the estimator to the given input data. The fitting logic is 
contained in the
+    * [[FitOperation]]. The computed state will be stored in the implementing 
class.
+    *
+    * @param training Training data
+    * @param fitParameters Additional parameters for the [[FitOperation]]
+    * @param fitOperation [[FitOperation]] which encapsulates the algorithm 
logic
+    * @tparam Training Type of the training data
+    * @return
+    */
+  def fit[Training](
+      training: DataSet[Training],
+      fitParameters: ParameterMap = ParameterMap.Empty)(implicit
+      fitOperation: FitOperation[Self, Training]): Unit = {
+    fitOperation.fit(this, fitParameters, training)
+  }
+}
+
+object Estimator{
+
+  /** Fallback [[FitOperation]] type class implementation which is used if no 
other
+    * [[FitOperation]] with the right input types could be found in the scope 
of the implementing
+    * class. The fallback [[FitOperation]] makes the system fail in the 
pre-flight phase by
+    * throwing a [[RuntimeException]] which states the reason for the failure. 
Usually the error
+    * is a missing [[FitOperation]] implementation for the input types or the 
wrong chaining
+    * of pipeline operators which have incompatible input/output types.
+    *
+     * @tparam Self Type of the pipeline operator
+    * @tparam Training Type of training data
+    * @return
+    */
+  implicit def fallbackFitOperation[Self: ClassTag, Training: ClassTag]
+    : FitOperation[Self, Training] = {
+    new FitOperation[Self, Training]{
+      override def fit(
+          instance: Self,
+          fitParameters: ParameterMap,
+          input: DataSet[Training])
+        : Unit = {
+
+          val self = implicitly[ClassTag[Self]]
+          val training = implicitly[ClassTag[Training]]
+
+          throw new RuntimeException("There is no FitOperation defined for " + 
self.runtimeClass +
+            " which trains on a DataSet[" + training.runtimeClass + "]")
+        }
+      }
+    }
+
+  /** Fallback [[FitOperation]] type class implementation for 
[[ChainedTransformer]]. The fallback
+    * implementation is used if the Scala compiler could not instantiate the 
chained fit operation
+    * defined in the companion object of [[ChainedTransformer]]. This is 
usually the case if either
+    * a [[FitOperation]] or a [[TransformOperation]] could not be instantiated 
for one of the
+    * leaves of the chained transformer. The fallback [[FitOperation]] calls 
the first the
+    * fit operation of the left transformer, then the transform operation of 
the left transformer
+    * and last the fit operation of the right transformer.
+    *
+    * @param leftFitOperation [[FitOperation]] of the left transformer
+    * @param leftTransformOperation [[TransformOperation]] of the left 
transformer
+    * @param rightFitOperaiton [[FitOperation]] of the right transformer
+    * @tparam L Type of left transformer
+    * @tparam R Type of right transformer
+    * @tparam LI Input type of left transformer's [[FitOperation]]
+    * @tparam LO Output type of left transformer's [[TransformOperation]]
+    * @return
+    */
+  implicit def fallbackChainedFitOperationTransformer[
+      L <: Transformer[L],
+      R <: Transformer[R],
+      LI,
+      LO](implicit
+      leftFitOperation: FitOperation[L, LI],
+      leftTransformOperation: TransformOperation[L, LI, LO],
+      rightFitOperaiton: FitOperation[R, LO])
+    : FitOperation[ChainedTransformer[L, R], LI] = {
+    new FitOperation[ChainedTransformer[L, R], LI] {
+      override def fit(
+          instance: ChainedTransformer[L, R],
+          fitParameters: ParameterMap,
+          input: DataSet[LI]): Unit = {
+        instance.left.fit(input, fitParameters)
+        val intermediate = instance.left.transform(input, fitParameters)
+        instance.right.fit(intermediate, fitParameters)
+      }
+    }
+  }
+
+  /** Fallback [[FitOperation]] type class implementation for 
[[ChainedPredictor]]. The fallback
+    * implementation is used if the Scala compiler could not instantiate the 
chained fit operation
+    * defined in the companion object of [[ChainedPredictor]]. This is usually 
the case if either
+    * a [[FitOperation]] or a [[TransformOperation]] could not be instantiated 
for one of the
+    * leaves of the chained transformer. The fallback [[FitOperation]] calls 
the first the
+    * fit operation of the left transformer, then the transform operation of 
the left transformer
+    * and last the fit operation of the right transformer.
+    *
+    * @param leftFitOperation [[FitOperation]] of the left transformer
+    * @param leftTransformOperation [[TransformOperation]] of the left 
transformer
+    * @param rightFitOperaiton [[FitOperation]] of the right transformer
+    * @tparam L Type of left transformer
+    * @tparam R Type of right transformer
+    * @tparam LI Input type of left transformer's [[FitOperation]]
+    * @tparam LO Output type of left transformer's [[TransformOperation]]
+    * @return
+    */
+  implicit def fallbackChainedFitOperationPredictor[
+  L <: Transformer[L],
+  R <: Predictor[R],
+  LI,
+  LO](implicit
+    leftFitOperation: FitOperation[L, LI],
+    leftTransformOperation: TransformOperation[L, LI, LO],
+    rightFitOperaiton: FitOperation[R, LO])
+  : FitOperation[ChainedPredictor[L, R], LI] = {
+    new FitOperation[ChainedPredictor[L, R], LI] {
+      override def fit(
+          instance: ChainedPredictor[L, R],
+          fitParameters: ParameterMap,
+          input: DataSet[LI]): Unit = {
+        instance.transformer.fit(input, fitParameters)
+        val intermediate = instance.transformer.transform(input, fitParameters)
+        instance.predictor.fit(intermediate, fitParameters)
+      }
+    }
+  }
+}
+
+/** Type class for the fit operation of an [[Estimator]].
+  *
+  * The [[FitOperation]] contains a self type parameter so that the Scala 
compiler looks into
+  * the companion object of this class to find implicit values.
+  *
+  * @tparam Self Type of the [[Estimator]] subclass for which the 
[[FitOperation]] is defined
+  * @tparam Training Type of the training data
+  */
+trait FitOperation[Self, Training]{
+  def fit(instance: Self, fitParameters: ParameterMap,  input: 
DataSet[Training]): Unit
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
new file mode 100644
index 0000000..ebfa787
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.pipeline
+
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.ml.common.{ParameterMap, WithParameters}
+
+/** Predictor trait for Flink's pipeline operators.
+  *
+  * A [[Predictor]] calculates predictions for testing data based on the model 
it learned during
+  * the fit operation (training phase). In order to do that, the implementing 
class has to provide
+  * a [[FitOperation]] and a [[PredictOperation]] implementation for the 
correct types. The implicit
+  * values should be put into the scope of the companion object of the 
implementing class to make
+  * them retrievable for the Scala compiler.
+  *
+  * The pipeline mechanism has been inspired by scikit-learn
+  *
+  * @tparam Self Type of the implementing class
+  */
+trait Predictor[Self] extends Estimator[Self] with WithParameters with 
Serializable {
+  that: Self =>
+
+  /** Predict testing data according the learned model. The implementing class 
has to provide
+    * a corresponding implementation of [[PredictOperation]] which contains 
the prediction logic.
+    *
+    * @param testing Testing data which shall be predicted
+    * @param predictParameters Additional parameters for the prediction
+    * @param predictor [[PredictOperation]] which encapsulates the prediction 
logic
+    * @tparam Testing Type of the testing data
+    * @tparam Prediction Type of the prediction data
+    * @return
+    */
+  def predict[Testing, Prediction](
+      testing: DataSet[Testing],
+      predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+      predictor: PredictOperation[Self, Testing, Prediction])
+    : DataSet[Prediction] = {
+    predictor.predict(this, predictParameters, testing)
+  }
+}
+
+object Predictor{
+
+  /** Fallback [[PredictOperation]] if a [[Predictor]] is called with a not 
supported input data
+    * type. The fallback [[PredictOperation]] lets the system fail with a 
[[RuntimeException]]
+    * stating which input and output data types were inferred but for which no 
[[PredictOperation]]
+    * could be found.
+    *
+    * @tparam Self Type of the [[Predictor]]
+    * @tparam Testing Type of the testing data
+    * @tparam Prediction Type of the predicted data
+    * @return
+    */
+  implicit def fallbackPredictOperation[Self: ClassTag, Testing: ClassTag, 
Prediction: ClassTag]
+    : PredictOperation[Self, Testing, Prediction] = {
+    new PredictOperation[Self, Testing, Prediction] {
+      override def predict(
+          instance: Self,
+          predictParameters: ParameterMap,
+          input: DataSet[Testing])
+        : DataSet[Prediction] = {
+        val self = implicitly[ClassTag[Self]]
+        val testing = implicitly[ClassTag[Testing]]
+        val prediction = implicitly[ClassTag[Prediction]]
+
+        throw new RuntimeException("There is no PredictOperation defined for " 
+ self.runtimeClass +
+          " which takes a DataSet[" + testing.runtimeClass + "] as input and 
returns a DataSet[" +
+          prediction.runtimeClass + "]")
+      }
+    }
+  }
+
+  /** Fallback [[PredictOperation]] for a [[ChainedPredictor]] if a 
[[TransformOperation]] for
+    * one of the [[Transformer]] and its respective types or the 
[[PredictOperation]] for the
+    * [[Predictor]] and its respective type could not be found. This is 
usually the case, if the
+    * the pipeline contains pipeline operators which work on incompatible 
types.
+    *
+    * The fallback [[PredictOperation]] first transforms the input data by 
calling the transform
+    * method of the [[Transformer]] and then the predict method of the 
[[Predictor]].
+    *
+    * @param leftTransformOperation [[TransformOperation]] of the 
[[Transformer]]
+    * @param rightPredictOperation [[PredictOperation]] of the [[Predictor]]
+    * @tparam L Type of the [[Transformer]]
+    * @tparam R Type of the [[Predictor]]
+    * @tparam LI Input type of the [[Transformer]]
+    * @tparam LO Output type of the [[Transformer]]
+    * @tparam RO Prediction type of the [[Predictor]]
+    * @return
+    */
+  implicit def fallbackChainedPredictOperation[
+      L <: Transformer[L],
+      R <: Predictor[R],
+      LI,
+      LO,
+      RO](implicit
+      leftTransformOperation: TransformOperation[L, LI, LO],
+      rightPredictOperation: PredictOperation[R, LO, RO]
+      )
+    : PredictOperation[ChainedPredictor[L, R], LI, RO] = {
+    new PredictOperation[ChainedPredictor[L, R], LI, RO] {
+      override def predict(
+          instance: ChainedPredictor[L, R],
+          predictParameters: ParameterMap,
+          input: DataSet[LI]): DataSet[RO] = {
+        val intermediate = instance.transformer.transform(input, 
predictParameters)
+        instance.predictor.predict(intermediate, predictParameters)
+      }
+    }
+  }
+}
+
+/** Type class for the predict operation of [[Predictor]].
+  *
+  * Predictors have to implement this trait and make the result available as 
an implicit value or
+  * function in the scope of their companion objects.
+  *
+  * The first type parameter is the type of the implementing [[Predictor]] 
class so that the Scala
+  * compiler includes the companion object of this class in the search scope 
for the implicit
+  * values.
+  *
+  * @tparam Self Type of [[Predictor]] implementing class
+  * @tparam Testing Type of testing data
+  * @tparam Prediction Type of predicted data
+  */
+trait PredictOperation[Self, Testing, Prediction]{
+  def predict(
+      instance: Self,
+      predictParameters: ParameterMap,
+      input: DataSet[Testing])
+    : DataSet[Prediction]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
new file mode 100644
index 0000000..52e3f7f
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.pipeline
+
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.ml.common.{ParameterMap, WithParameters}
+
+/** Transformer trait for Flink's pipeline operators.
+  *
+  * A Transformer transforms a [[DataSet]] of an input type into a [[DataSet]] 
of an output type.
+  * Furthermore, a [[Transformer]] is also an [[Estimator]], because some 
transformations depend
+  * on the training data. In order to do that the implementing class has to 
provide a
+  * [[TransformOperation]] and [[FitOperation]] implementation. The Scala 
compiler finds these
+  * implicit values if it is put in the scope of the companion object of the 
implementing class.
+  *
+  * [[Transformer]] can be chained with other [[Transformer]] and 
[[Predictor]] to create
+  * pipelines. These pipelines can consist of an arbitrary number of 
[[Transformer]] and at most
+  * one trailing [[Predictor]].
+  *
+  * The pipeline mechanism has been inspired by scikit-learn
+  *
+  * @tparam Self
+  */
+trait Transformer[Self <: Transformer[Self]]
+  extends Estimator[Self]
+  with WithParameters
+  with Serializable {
+  that: Self =>
+
+  /** Transform operation which transforms an input [[DataSet]] of type I into 
an ouptut [[DataSet]]
+    * of type O. The actual transform operation is implemented within the 
[[TransformOperation]].
+    *
+    * @param input Input [[DataSet]] of type I
+    * @param transformParameters Additional parameters for the 
[[TransformOperation]]
+    * @param transformOperation [[TransformOperation]] which encapsulates the 
algorithm's logic
+    * @tparam Input Input data type
+    * @tparam Output Ouptut data type
+    * @return
+    */
+  def transform[Input, Output](
+      input: DataSet[Input],
+      transformParameters: ParameterMap = ParameterMap.Empty)
+      (implicit transformOperation: TransformOperation[Self, Input, Output])
+    : DataSet[Output] = {
+    transformOperation.transform(that, transformParameters, input)
+  }
+
+  /** Chains two [[Transformer]] to form a [[ChainedTransformer]].
+    *
+    * @param transformer Right side transformer of the resulting pipeline
+    * @tparam T Type of the [[Transformer]]
+    * @return
+    */
+  def chainTransformer[T <: Transformer[T]](transformer: T): 
ChainedTransformer[Self, T] = {
+    ChainedTransformer(this, transformer)
+  }
+
+  /** Chains a [[Transformer]] with a [[Predictor]] to form a 
[[ChainedPredictor]].
+    *
+    * @param predictor Trailing [[Predictor]] of the resulting pipeline
+    * @tparam P Type of the [[Predictor]]
+    * @return
+    */
+  def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, 
P] = {
+    ChainedPredictor(this, predictor)
+  }
+}
+
+object Transformer{
+
+  /** Fallback [[TransformOperation]] for [[ChainedTransformer]] which is used 
if no suitable
+    * [[TransformOperation]] implementation can be found. This implementation 
is used if there is no
+    * [[TransformOperation]] for one of the leaves of the 
[[ChainedTransformer]] for the given
+    * input types. This is usually the case if one [[Transformer]] does not 
support the transform
+    * operation for the input type.
+    *
+    * The fallback [[TransformOperation]] for [[ChainedTransformer]] calls 
first the transform
+    * operation of the left transformer and then the transform operation of 
the right transformer.
+    * That way the fallback [[TransformOperation]] for a [[Transformer]] will 
be called which
+    * will fail the job in the pre-flight phase by throwing an exception.
+    *
+    * @param transformLeft Left [[Transformer]] of the pipeline
+    * @param transformRight Right [[Transformer]] of the pipeline
+    * @tparam L Type of the left [[Transformer]]
+    * @tparam R Type of the right [[Transformer]]
+    * @tparam LI Input type of left transformer's [[TransformOperation]]
+    * @tparam LO Output type of left transformer's [[TransformOperation]]
+    * @tparam RO Output type of right transformer's [[TransformOperation]]
+    * @return
+    */
+  implicit def fallbackChainedTransformOperation[
+      L <: Transformer[L],
+      R <: Transformer[R],
+      LI,
+      LO,
+      RO]
+      (implicit transformLeft: TransformOperation[L, LI, LO],
+      transformRight: TransformOperation[R, LO, RO])
+    : TransformOperation[ChainedTransformer[L,R], LI, RO] = {
+
+    new TransformOperation[ChainedTransformer[L, R], LI, RO] {
+      override def transform(
+          chain: ChainedTransformer[L, R],
+          transformParameters: ParameterMap,
+          input: DataSet[LI]): DataSet[RO] = {
+        val intermediate = transformLeft.transform(chain.left, 
transformParameters, input)
+        transformRight.transform(chain.right, transformParameters, 
intermediate)
+      }
+    }
+  }
+
+  /** Fallback [[TransformOperation]] for [[Transformer]] which do not support 
the input or output
+    * type with which they are called. This is usualy the case if pipeline 
operators are chained
+    * which have incompatible input/output types. In order to detect these 
failures, the fallback
+    * [[TransformOperation]] throws a [[RuntimeException]] with the 
corresponding input/output
+    * types. Consequently, a wrong pipeline will be detected at pre-flight 
phase of Flink and
+    * thus prior to execution time.
+    *
+    * @tparam Self Type of the [[Transformer]] for which the 
[[TransformOperation]] is defined
+    * @tparam IN Input data type of the [[TransformOperation]]
+    * @tparam OUT Output data type of the [[TransformOperation]]
+    * @return
+    */
+  implicit def fallbackTransformOperation[
+      Self: ClassTag,
+      IN: ClassTag,
+      OUT: ClassTag]
+    : TransformOperation[Self, IN, OUT] = {
+    new TransformOperation[Self, IN, OUT] {
+      override def transform(
+          instance: Self,
+          transformParameters: ParameterMap,
+          input: DataSet[IN])
+        : DataSet[OUT] = {
+        val self = implicitly[ClassTag[Self]]
+        val in = implicitly[ClassTag[IN]]
+        val out = implicitly[ClassTag[OUT]]
+
+        throw new RuntimeException("There is no TransformOperation defined for 
" +
+          self.runtimeClass +  " which takes a DataSet[" + in.runtimeClass +
+          "] as input and transforms it into a DataSet[" + out.runtimeClass + 
"]")
+      }
+    }
+  }
+}
+
+/** Type class for a transform operation of [[Transformer]].
+  *
+  * The [[TransformOperation]] contains a self type parameter so that the 
Scala compiler looks into
+  * the companion object of this class to find implicit values.
+  *
+  * @tparam Self Type of the [[Transformer]] for which the 
[[TransformOperation]] is defined
+  * @tparam Input Input data type
+  * @tparam Output Ouptut data type
+  */
+abstract class TransformOperation[Self, Input, Output] extends Serializable{
+  def transform(
+      instance: Self,
+      transformParameters: ParameterMap,
+      input: DataSet[Input])
+    : DataSet[Output]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
new file mode 100644
index 0000000..8c7daad
--- /dev/null
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
+import org.apache.flink.ml.math.{Vector, VectorBuilder}
+import org.apache.flink.ml.pipeline.{FitOperation, TransformOperation, 
Transformer}
+import org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree
+
+import scala.reflect.ClassTag
+
+/** Maps a vector into the polynomial feature space.
+  *
+  * This transformer takes a a vector of values `(x, y, z, ...)` and maps it 
into the
+  * polynomial feature space of degree `d`. That is to say, it calculates the 
following
+  * representation:
+  *
+  * `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T`
+  *
+  * This transformer can be prepended to all 
[[org.apache.flink.ml.pipeline.Transformer]] and
+  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect an 
input of
+  * [[LabeledVector]].
+  *
+  * @example
+  *          {{{
+  *             val trainingDS: DataSet[LabeledVector] = ...
+  *
+  *             val polyFeatures = PolynomialFeatures()
+  *               .setDegree(3)
+  *
+  *             val mlr = MultipleLinearRegression()
+  *
+  *             val pipeline = polyFeatures.chainPredictor(mlr)
+  *
+  *             pipeline.fit(trainingDS)
+  *          }}}
+  *
+  * =Parameters=
+  *
+  *  - [[org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree]]: 
Maximum polynomial degree
+  */
+class PolynomialFeatures extends Transformer[PolynomialFeatures] {
+
+  def setDegree(degree: Int): PolynomialFeatures = {
+    parameters.add(Degree, degree)
+    this
+  }
+}
+
+object PolynomialFeatures{
+
+  // ====================================== Parameters 
=============================================
+
+  case object Degree extends Parameter[Int] {
+    override val defaultValue: Option[Int] = Some(1)
+  }
+
+  // =================================== Factory methods 
===========================================
+
+  def apply(): PolynomialFeatures = {
+    new PolynomialFeatures()
+  }
+
+  // ====================================== Operations 
=============================================
+
+  /** The [[PolynomialFeatures]] transformer does not need a fitting phase.
+    *
+    * @tparam T The fitting works with arbitrary input types
+    * @return
+    */
+  implicit def fitNoOp[T] = {
+    new FitOperation[PolynomialFeatures, T]{
+      override def fit(
+          instance: PolynomialFeatures,
+          fitParameters: ParameterMap,
+          input: DataSet[T])
+        : Unit = {}
+    }
+  }
+
+  /** [[org.apache.flink.ml.pipeline.TransformOperation]] to map a [[Vector]] 
into the polynomial
+    * feature space.
+    *
+    * @tparam T Subclass of [[Vector]]
+    * @return
+    */
+  implicit def transformVectorIntoPolynomialBase[
+      T <: Vector : VectorBuilder: TypeInformation: ClassTag
+    ] = {
+    new TransformOperation[PolynomialFeatures, T, T] {
+      override def transform(
+          instance: PolynomialFeatures,
+          transformParameters: ParameterMap,
+          input: DataSet[T])
+        : DataSet[T] = {
+        val resultingParameters = instance.parameters ++ transformParameters
+
+        val degree = resultingParameters(Degree)
+
+        input.map {
+          vector => {
+            calculatePolynomial(degree, vector)
+          }
+        }
+      }
+    }
+  }
+
+  /** [[org.apache.flink.ml.pipeline.TransformOperation]] to map a 
[[LabeledVector]] into the
+    * polynomial feature space
+    */
+  implicit val transformLabeledVectorIntoPolynomialBase =
+    new TransformOperation[PolynomialFeatures, LabeledVector, LabeledVector] {
+
+    override def transform(
+        instance: PolynomialFeatures,
+        transformParameters: ParameterMap,
+        input: DataSet[LabeledVector])
+      : DataSet[LabeledVector] = {
+      val resultingParameters = instance.parameters ++ transformParameters
+
+      val degree = resultingParameters(Degree)
+
+      input.map {
+        labeledVector => {
+          val vector = labeledVector.vector
+          val label = labeledVector.label
+
+          val transformedVector = calculatePolynomial(degree, vector)
+
+          LabeledVector(label, transformedVector)
+        }
+      }
+    }
+  }
+
+
+  private def calculatePolynomial[T <: Vector: VectorBuilder](degree: Int, 
vector: T): T = {
+    val builder = implicitly[VectorBuilder[T]]
+    builder.build(calculateCombinedCombinations(degree, vector))
+  }
+
+  /** Calculates for a given vector its representation in the polynomial 
feature space.
+    *
+    * @param degree Maximum degree of polynomial
+    * @param vector Values of the polynomial variables
+    * @return List of polynomial values
+    */
+  private def calculateCombinedCombinations(degree: Int, vector: Vector): 
List[Double] = {
+    if(degree == 0) {
+      List()
+    } else {
+      val partialResult = calculateCombinedCombinations(degree - 1, vector)
+
+      val combinations = calculateCombinations(vector.size, degree)
+
+      val result = combinations map {
+        combination =>
+          combination.zipWithIndex.map{
+            case (exp, idx) => math.pow(vector(idx), exp)
+          }.fold(1.0)(_ * _)
+      }
+
+      result ::: partialResult
+    }
+
+  }
+
+  /** Calculates all possible combinations of a polynom of degree `value`, 
whereas the polynom
+    * can consist of up to `length` factors. The return value is the list of 
the exponents of the
+    * individual factors
+    *
+    * @param length maximum number of factors
+    * @param value degree of polynomial
+    * @return List of lists which contain the exponents of the individual 
factors
+    */
+  private def calculateCombinations(length: Int, value: Int): List[List[Int]] 
= {
+    if(length == 0) {
+      List()
+    } else if (length == 1) {
+      List(List(value))
+    } else {
+      value to 0 by -1 flatMap {
+        v =>
+          calculateCombinations(length - 1, value - v) map {
+            v::_
+          }
+      } toList
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index 9224e7c..bd952c3 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -22,38 +22,47 @@ import breeze.linalg
 import breeze.numerics.sqrt
 import breeze.numerics.sqrt._
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
-import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
+import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, 
Transformer}
 import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
 
+import scala.reflect.ClassTag
+
 /** Scales observations, so that all features have a user-specified mean and 
standard deviation.
   * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0.
   *
-  * This transformer takes a [[Vector]] of values and maps it to a
-  * scaled [[Vector]] such that each feature has a user-specified mean and 
standard deviation.
+  * This transformer takes a subtype of  [[Vector]] of values and maps it to a
+  * scaled subtype of [[Vector]] such that each feature has a user-specified 
mean and standard
+  * deviation.
   *
   * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
-  * [[Vector]].
+  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as 
input a subtype
+  * of [[Vector]].
   *
   * @example
   *          {{{
   *            val trainingDS: DataSet[Vector] = env.fromCollection(data)
   *            val transformer = StandardScaler().setMean(10.0).setStd(2.0)
   *
-  *            transformer.transform(trainingDS)
+  *            transformer.fit(trainingDS)
+  *            val transformedDS = transformer.transform(trainingDS)
   *          }}}
   *
   * =Parameters=
   *
-  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
-  * - [[StandardScaler.Std]]: The standard deviation of the transformed data 
set; by default
+  * - [[Mean]]: The mean value of transformed data set; by default equal to 0
+  * - [[Std]]: The standard deviation of the transformed data set; by default
   * equal to 1
   */
-class StandardScaler extends Transformer[Vector, Vector] with Serializable {
+class StandardScaler extends Transformer[StandardScaler] {
+
+
+  var metricsOption: Option[DataSet[(linalg.Vector[Double], 
linalg.Vector[Double])]] = None
 
   /** Sets the target mean of the transformed data
     *
@@ -78,36 +87,62 @@ class StandardScaler extends Transformer[Vector, Vector] 
with Serializable {
     parameters.add(Std, std)
     this
   }
+}
 
-  override def transform(input: DataSet[Vector], parameters: ParameterMap):
-  DataSet[Vector] = {
-    val resultingParameters = this.parameters ++ parameters
-    val mean = resultingParameters(Mean)
-    val std = resultingParameters(Std)
+object StandardScaler {
+
+  // ====================================== Parameters 
=============================================
 
-    val featureMetrics = extractFeatureMetrics(input)
+  case object Mean extends Parameter[Double] {
+    override val defaultValue: Option[Double] = Some(0.0)
+  }
 
-    input.map(new RichMapFunction[Vector, Vector]() {
+  case object Std extends Parameter[Double] {
+    override val defaultValue: Option[Double] = Some(1.0)
+  }
 
-      var broadcastMean: linalg.Vector[Double] = null
-      var broadcastStd: linalg.Vector[Double] = null
+  // ==================================== Factory methods 
==========================================
 
-      override def open(parameters: Configuration): Unit = {
-        val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double],
-          linalg.Vector[Double])]("broadcastedMetrics").get(0)
-        broadcastMean = broadcastedMetrics._1
-        broadcastStd = broadcastedMetrics._2
-      }
+  def apply(): StandardScaler = {
+    new StandardScaler()
+  }
+
+  // ====================================== Operations 
=============================================
+
+  /** Trains the [[org.apache.flink.ml.preprocessing.StandardScaler]] by 
learning the mean and
+    * standard deviation of the training data. These values are used inthe 
transform step
+    * to transform the given input data.
+    *
+    * @tparam T Input data type which is a subtype of [[Vector]]
+    * @return
+    */
+  implicit def fitVectorStandardScaler[T <: Vector] = new 
FitOperation[StandardScaler, T] {
+    override def fit(instance: StandardScaler, fitParameters: ParameterMap, 
input: DataSet[T])
+      : Unit = {
+      val metrics = extractFeatureMetrics(input)
+
+      instance.metricsOption = Some(metrics)
+    }
+  }
 
-      override def map(vector: Vector): Vector = {
-        var myVector = vector.asBreeze
+  /** Trains the [[StandardScaler]] by learning the mean and standard 
deviation of the training
+    * data which is of type [[LabeledVector]]. The mean and standard deviation 
are used to
+    * transform the given input data.
+    *
+    */
+  implicit val fitLabeledVectorStandardScaler = {
+    new FitOperation[StandardScaler, LabeledVector] {
+      override def fit(
+          instance: StandardScaler,
+          fitParameters: ParameterMap,
+          input: DataSet[LabeledVector])
+        : Unit = {
+        val vectorDS = input.map(_.vector)
+        val metrics = extractFeatureMetrics(vectorDS)
 
-        myVector -= broadcastMean
-        myVector :/= broadcastStd
-        myVector = (myVector :* std) + mean
-        return myVector.fromBreeze
+        instance.metricsOption = Some(metrics)
       }
-    }).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+    }
   }
 
   /** Calculates in one pass over the data the features' mean and standard 
deviation.
@@ -121,7 +156,7 @@ class StandardScaler extends Transformer[Vector, Vector] 
with Serializable {
     *          The first vector represents the mean vector and the second is 
the standard
     *          deviation vector.
     */
-  private def extractFeatureMetrics(dataSet: DataSet[Vector])
+  private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T])
   : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
     val metrics = dataSet.map{
       v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
@@ -154,19 +189,100 @@ class StandardScaler extends Transformer[Vector, Vector] 
with Serializable {
     }
     metrics
   }
-}
 
-object StandardScaler {
+  /** [[TransformOperation]] which scales input data of subtype of [[Vector]] 
with respect to
+    * the calculated mean and standard deviation of the training data. The 
mean and standard
+    * deviation of the resulting data is configurable.
+    *
+    * @tparam T Type of the input and output data which has to be a subtype of 
[[Vector]]
+    * @return
+    */
+  implicit def transformVectors[T <: Vector: BreezeVectorConverter: 
TypeInformation: ClassTag] = {
+    new TransformOperation[StandardScaler, T, T] {
+      override def transform(
+        instance: StandardScaler,
+        transformParameters: ParameterMap,
+        input: DataSet[T])
+      : DataSet[T] = {
 
-  case object Mean extends Parameter[Double] {
-    override val defaultValue: Option[Double] = Some(0.0)
-  }
+        val resultingParameters = instance.parameters ++ transformParameters
+        val mean = resultingParameters(Mean)
+        val std = resultingParameters(Std)
 
-  case object Std extends Parameter[Double] {
-    override val defaultValue: Option[Double] = Some(1.0)
+        instance.metricsOption match {
+          case Some(metrics) => {
+            input.map(new RichMapFunction[T, T]() {
+
+              var broadcastMean: linalg.Vector[Double] = null
+              var broadcastStd: linalg.Vector[Double] = null
+
+              override def open(parameters: Configuration): Unit = {
+                val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[
+                    (linalg.Vector[Double], linalg.Vector[Double])
+                  ]("broadcastedMetrics").get(0)
+                broadcastMean = broadcastedMetrics._1
+                broadcastStd = broadcastedMetrics._2
+              }
+
+              override def map(vector: T): T = {
+                var myVector = vector.asBreeze
+
+                myVector -= broadcastMean
+                myVector :/= broadcastStd
+                myVector = (myVector :* std) + mean
+                myVector.fromBreeze
+              }
+            }).withBroadcastSet(metrics, "broadcastedMetrics")
+          }
+
+          case None =>
+            throw new RuntimeException("The StandardScaler has not been fitted 
to the data. " +
+              "This is necessary to estimate the mean and standard deviation 
of the data.")
+        }
+      }
+    }
   }
 
-  def apply(): StandardScaler = {
-    new StandardScaler()
+  implicit val transformLabeledVectors = {
+    new TransformOperation[StandardScaler, LabeledVector, LabeledVector] {
+      override def transform(instance: StandardScaler, transformParameters: 
ParameterMap, input:
+      DataSet[LabeledVector]): DataSet[LabeledVector] = {
+        val resultingParameters = instance.parameters ++ transformParameters
+        val mean = resultingParameters(Mean)
+        val std = resultingParameters(Std)
+
+        instance.metricsOption match {
+          case Some(metrics) => {
+            input.map(new RichMapFunction[LabeledVector, LabeledVector]() {
+
+              var broadcastMean: linalg.Vector[Double] = null
+              var broadcastStd: linalg.Vector[Double] = null
+
+              override def open(parameters: Configuration): Unit = {
+                val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[
+                  (linalg.Vector[Double], linalg.Vector[Double])
+                  ]("broadcastedMetrics").get(0)
+                broadcastMean = broadcastedMetrics._1
+                broadcastStd = broadcastedMetrics._2
+              }
+
+              override def map(labeledVector: LabeledVector): LabeledVector = {
+                val LabeledVector(label, vector) = labeledVector
+                var breezeVector = vector.asBreeze
+
+                breezeVector -= broadcastMean
+                breezeVector :/= broadcastStd
+                breezeVector = (breezeVector :* std) + mean
+                LabeledVector(label, breezeVector.fromBreeze[Vector])
+              }
+            }).withBroadcastSet(metrics, "broadcastedMetrics")
+          }
+
+          case None =>
+            throw new RuntimeException("The StandardScaler has not been fitted 
to the data. " +
+              "This is necessary to estimate the mean and standard deviation 
of the data.")
+        }
+      }
+    }
   }
 }

Reply via email to