http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
new file mode 100644
index 0000000..aa1c4f9
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math
+
+import org.scalatest.{Matchers, FlatSpec}
+
+class DenseVectorSuite extends FlatSpec with Matchers {
+
+  behavior of "Flink's DenseVector"
+
+  it should "contain the initialization data" in {
+    val data = Array.range(1, 10)
+
+    val vector = DenseVector(data)
+
+    assertResult(data.length)(vector.size)
+
+    data.zip(vector.map(_._2)).foreach { case (expected, actual) => 
assertResult(expected)(actual) }
+  }
+
+  it should "fail in case of an illegal element access" in {
+    val size = 10
+
+    val vector = DenseVector.zeros(size)
+
+    intercept[IllegalArgumentException] {
+      vector(-1)
+    }
+
+    intercept[IllegalArgumentException] {
+      vector(size)
+    }
+  }
+
+  it should "calculate dot product with DenseVector" in {
+    val vec1 = DenseVector(Array(1, 0, 1))
+    val vec2 = DenseVector(Array(0, 1, 0))
+
+    vec1.dot(vec2) should be(0)
+  }
+
+  it should "calculate dot product with SparseVector" in {
+    val vec1 = DenseVector(Array(1, 0, 1))
+    val vec2 = SparseVector.fromCOO(3, (0, 1), (1, 1))
+
+    vec1.dot(vec2) should be(1)
+  }
+
+  it should "calculate dot product with SparseVector 2" in {
+    val vec1 = DenseVector(Array(1, 0, 1, 0, 0))
+    val vec2 = SparseVector.fromCOO(5, (2, 1), (4, 1))
+
+    vec1.dot(vec2) should be(1)
+  }
+
+  it should "fail in case of calculation dot product with different size 
vector" in {
+    val vec1 = DenseVector(Array(1, 0))
+    val vec2 = DenseVector(Array(0))
+
+    intercept[IllegalArgumentException] {
+      vec1.dot(vec2)
+    }
+  }
+
+  it should "calculate outer product with DenseVector correctly as 
DenseMatrix" in {
+    val vec1 = DenseVector(Array(1, 0, 1))
+    val vec2 = DenseVector(Array(0, 1, 0))
+
+    vec1.outer(vec2) should be(an[DenseMatrix])
+    vec1.outer(vec2) should be(DenseMatrix(3, 3, Array(0, 1, 0, 0, 0, 0, 0, 1, 
0)))
+  }
+
+  it should "calculate outer product with SparseVector correctly as 
SparseMatrix" in {
+    val vec1 = DenseVector(Array(1, 0, 1))
+    val vec2 = SparseVector(3, Array(1), Array(1))
+
+    vec1.outer(vec2) should be(an[SparseMatrix])
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(3, 3, (0, 1, 1), (2, 1, 
1)))
+  }
+
+  it should "calculate outer product with a DenseVector correctly as 
DenseMatrix 2" in {
+    val vec1 = DenseVector(Array(1, 0, 1, 0, 0))
+    val vec2 = DenseVector(Array(0, 0, 1, 0, 1))
+
+    val values = Array(0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0)
+    vec1.outer(vec2) should be(DenseMatrix(5, 5, values))
+  }
+
+  it should "calculate outer product with a SparseVector correctly as 
SparseMatrix 2" in {
+    val vec1 = DenseVector(Array(1, 0, 1, 0, 0))
+    val vec2 = SparseVector.fromCOO(5, (2, 1), (4, 1))
+
+    val entries = Iterable((0, 2, 1.0), (0, 4, 1.0), (2, 2, 1.0), (2, 4, 1.0))
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(5, 5, entries))
+  }
+
+
+
+  it should s"""calculate right outer product with DenseVector
+               |with one-dimensional unit DenseVector as 
identity""".stripMargin in {
+    val vec = DenseVector(Array(1, 0, 1, 0, 0))
+    val unit = DenseVector(1)
+
+    vec.outer(unit) should equal(DenseMatrix(vec.size, 1, vec.data))
+  }
+
+  it should s"""calculate right outer product with DenseVector
+               |with one-dimensional unit SparseVector as 
identity""".stripMargin in {
+    val vec = DenseVector(Array(1, 0, 1, 0, 0))
+    val unit = SparseVector(1, Array(0), Array(1))
+
+    vec.outer(unit) should equal(SparseMatrix.fromCOO(vec.size, 1, (0, 0, 1), 
(2, 0, 1)))
+  }
+
+  it should s"""calculate left outer product for DenseVector
+               |with one-dimensional unit DenseVector as 
identity""".stripMargin in {
+    val vec = DenseVector(Array(1, 2, 3, 4, 5))
+    val unit = DenseVector(1)
+
+    unit.outer(vec) should equal(DenseMatrix(1, vec.size, vec.data))
+  }
+
+  it should s"""calculate left outer product for SparseVector
+               |with one-dimensional unit DenseVector as 
identity""".stripMargin in {
+    val vec = SparseVector(5, Array(0, 1, 2, 3, 4), Array(1, 2, 3, 4, 5))
+    val unit = DenseVector(1)
+
+    val entries = Iterable((0, 0, 1.0), (0, 1, 2.0), (0, 2, 3.0), (0, 3, 4.0), 
(0, 4, 5.0))
+    unit.outer(vec) should equal(SparseMatrix.fromCOO(1, vec.size, entries))
+  }
+
+  it should s"""calculate outer product with DenseVector
+               |via multiplication if both vectors are 
one-dimensional""".stripMargin in {
+    val vec1 = DenseVector(Array(2))
+    val vec2 = DenseVector(Array(3))
+
+    vec1.outer(vec2) should be(DenseMatrix(1, 1, 2 * 3))
+  }
+
+  it should s"""calculate outer product with SparseVector
+               |via multiplication if both vectors are 
one-dimensional""".stripMargin in {
+    val vec1 = DenseVector(Array(2))
+    val vec2 = SparseVector(1, Array(0), Array(3))
+
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3)))
+  }
+
+  it should "calculate outer product with DenseVector via multiplication if 
both vectors " +
+    "are one-dimensional" in {
+    val vec1 = DenseVector(Array(2))
+    val vec2 = DenseVector(Array(3))
+
+    vec1.outer(vec2) should be(DenseMatrix(1, 1, 2 * 3))
+  }
+
+  it should "calculate outer product with SparseVector via multiplication if 
both vectors are " +
+    "one-dimensioan" in {
+    val vec1 = DenseVector(Array(2))
+    val vec2 = SparseVector(1, Array(0), Array(3))
+
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3)))
+  }
+
+  it should "calculate magnitude of vector" in {
+    val vec = DenseVector(Array(1, 4, 8))
+
+    vec.magnitude should be(9)
+  }
+
+  it should "convert from and to Breeze vector" in {
+    import Breeze._
+
+    val flinkVector = DenseVector(1, 2, 3)
+    val breezeVector = breeze.linalg.DenseVector.apply(1.0, 2.0, 3.0)
+
+    // use the vector BreezeVectorConverter
+    flinkVector should equal(breezeVector.fromBreeze)
+
+    // use the sparse vector BreezeVectorConverter
+    flinkVector should 
equal(breezeVector.fromBreeze(DenseVector.denseVectorConverter))
+
+    flinkVector.asBreeze should be(breezeVector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
new file mode 100644
index 0000000..970ea4b
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math
+
+import org.scalatest.{Matchers, FlatSpec}
+
+class SparseMatrixSuite extends FlatSpec with Matchers {
+
+  behavior of "Flink's SparseMatrix"
+
+  it should "contain a single element provided as a coordinate list (COO)" in {
+    val sparseMatrix = SparseMatrix.fromCOO(4, 4, (0, 0, 1))
+
+    sparseMatrix(0, 0) should equal(1)
+
+    for(i <- 1 until sparseMatrix.size) {
+      val row = i / sparseMatrix.numCols
+      val col = i % sparseMatrix.numCols
+
+      sparseMatrix(row, col) should equal(0)
+    }
+  }
+
+  it should "be initialized from a coordinate list representation (COO)" in {
+    val data = List[(Int, Int, Double)]((0, 0, 0), (0, 1, 0), (3, 4, 43), (2, 
1, 17),
+      (3, 3, 88), (4 , 2, 99), (1, 4, 91), (3, 4, -1))
+
+    val numRows = 5
+    val numCols = 5
+
+    val sparseMatrix = SparseMatrix.fromCOO(numRows, numCols, data)
+
+    val expectedSparseMatrix = SparseMatrix.fromCOO(5, 5, (3, 4, 42), (2, 1, 
17), (3, 3, 88),
+      (4, 2, 99), (1, 4, 91), (0, 0, 0), (0, 1, 0))
+
+    val expectedDenseMatrix = DenseMatrix.zeros(5, 5)
+    expectedDenseMatrix(3, 4) = 42
+    expectedDenseMatrix(2, 1) = 17
+    expectedDenseMatrix(3, 3) = 88
+    expectedDenseMatrix(4, 2) = 99
+    expectedDenseMatrix(1, 4) = 91
+
+    sparseMatrix should equal(expectedSparseMatrix)
+    sparseMatrix.equalsMatrix(expectedDenseMatrix) should be(true)
+
+    sparseMatrix.toDenseMatrix.data.sameElements(expectedDenseMatrix.data) 
should be(true)
+
+    val dataMap = data.
+      map{ case (row, col, value) => (row, col) -> value }.
+      groupBy{_._1}.
+      mapValues{
+      entries =>
+        entries.map(_._2).sum
+    }
+
+    for(row <- 0 until numRows; col <- 0 until numCols) {
+      sparseMatrix(row, col) should be(dataMap.getOrElse((row, col), 0))
+    }
+
+    // test access to defined field even though it was set to 0
+    sparseMatrix(0, 1) = 10
+
+    // test that a non-defined field is not accessible
+    intercept[IllegalArgumentException]{
+      sparseMatrix(1, 1) = 1
+    }
+  }
+
+  it should "fail when accessing zero elements or using invalid indices" in {
+    val data = List[(Int, Int, Double)]((0, 0, 0), (0, 1, 0), (3, 4, 43), (2, 
1, 17),
+      (3, 3, 88), (4 , 2, 99), (1, 4, 91), (3, 4, -1))
+
+    val numRows = 5
+    val numCols = 5
+
+    val sparseMatrix = SparseMatrix.fromCOO(numRows, numCols, data)
+
+    intercept[IllegalArgumentException] {
+      sparseMatrix(-1, 4)
+    }
+
+    intercept[IllegalArgumentException] {
+      sparseMatrix(numRows, 0)
+    }
+
+    intercept[IllegalArgumentException] {
+      sparseMatrix(0, numCols)
+    }
+
+    intercept[IllegalArgumentException] {
+      sparseMatrix(3, -1)
+    }
+  }
+
+  it should "fail when elements of the COO list have invalid indices" in {
+    intercept[IllegalArgumentException]{
+      val sparseMatrix = SparseMatrix.fromCOO(5 ,5, (5, 0, 10),  (0, 0, 0), 
(0, 1, 0), (3, 4, 43),
+        (2, 1, 17))
+    }
+
+    intercept[IllegalArgumentException]{
+      val sparseMatrix = SparseMatrix.fromCOO(5, 5,  (0, 0, 0), (0, 1, 0), (3, 
4, 43), (2, 1, 17),
+        (-1, 4, 20))
+    }
+  }
+
+  it should "be copyable" in {
+    val sparseMatrix = SparseMatrix.fromCOO(4, 4, (0, 1, 2), (2, 3, 1), (2, 0, 
42), (1, 3, 3))
+
+    val copy = sparseMatrix.copy
+
+    sparseMatrix should equal(copy)
+
+    copy(2, 3) = 2
+
+    sparseMatrix should not equal copy
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
new file mode 100644
index 0000000..15eed20
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.math
+
+import org.scalatest.{Matchers, FlatSpec}
+
+class SparseVectorSuite extends FlatSpec with Matchers {
+
+  behavior of "Flink's SparseVector"
+
+  it should "contain a single element provided as coordinate list (COO)" in {
+    val sparseVector = SparseVector.fromCOO(3, (0, 1))
+
+    sparseVector(0) should equal(1)
+
+    for (index <- 1 until 3) {
+      sparseVector(index) should equal(0)
+    }
+  }
+
+  it should "contain the initialization data provided as coordinate list 
(COO)" in {
+    val data = List[(Int, Double)]((0, 1), (2, 0), (4, 42), (0, 3))
+    val size = 5
+    val sparseVector = SparseVector.fromCOO(size, data)
+
+    val expectedSparseVector = SparseVector.fromCOO(5, (0, 4), (4, 42), (2, 0))
+    val expectedDenseVector = DenseVector.zeros(5)
+
+    expectedDenseVector(0) = 4
+    expectedDenseVector(4) = 42
+
+    sparseVector should equal(expectedSparseVector)
+    sparseVector.equalsVector(expectedDenseVector) should be(true)
+
+    val denseVector = sparseVector.toDenseVector
+
+    denseVector should equal(expectedDenseVector)
+
+    val dataMap = data.
+      groupBy {
+      _._1
+    }.
+      mapValues {
+      entries =>
+        entries.map(_._2).sum
+    }
+
+    for (index <- 0 until size) {
+      sparseVector(index) should be(dataMap.getOrElse(index, 0))
+    }
+  }
+
+  it should "fail when accessing elements using an invalid index" in {
+    val sparseVector = SparseVector.fromCOO(5, (1, 1), (3, 3), (4, 4))
+
+    intercept[IllegalArgumentException] {
+      sparseVector(-1)
+    }
+
+    intercept[IllegalArgumentException] {
+      sparseVector(5)
+    }
+  }
+
+  it should "fail when the COO list contains elements with invalid indices" in 
{
+    intercept[IllegalArgumentException] {
+      val sparseVector = SparseVector.fromCOO(5, (0, 1), (-1, 34), (3, 2))
+    }
+
+    intercept[IllegalArgumentException] {
+      val sparseVector = SparseVector.fromCOO(5, (0, 1), (4, 3), (5, 1))
+    }
+  }
+
+  it should "be copyable" in {
+    val sparseVector = SparseVector.fromCOO(5, (0, 1), (4, 3), (3, 2))
+
+    val copy = sparseVector.copy
+
+    sparseVector should equal(copy)
+
+    copy(3) = 3
+
+    sparseVector should not equal (copy)
+  }
+
+  it should "calculate dot product with SparseVector" in {
+    val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1))
+    val vec2 = SparseVector.fromCOO(4, (1, 1), (3, 1))
+
+    vec1.dot(vec2) should be(0)
+  }
+
+  it should "calculate dot product with SparseVector 2" in {
+    val vec1 = SparseVector.fromCOO(5, (2, 3), (4, 1))
+    val vec2 = SparseVector.fromCOO(5, (4, 2), (2, 1))
+
+    vec1.dot(vec2) should be(5)
+  }
+
+  it should "calculate dot product with DenseVector" in {
+    val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1))
+    val vec2 = DenseVector(Array(0, 1, 0, 1))
+
+    vec1.dot(vec2) should be(0)
+  }
+
+  it should "fail in case of calculation dot product with different size 
vector" in {
+    val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1))
+    val vec2 = DenseVector(Array(0, 1, 0))
+
+    intercept[IllegalArgumentException] {
+      vec1.dot(vec2)
+    }
+  }
+
+  it should "calculate outer product with SparseVector correctly as 
SparseMatrix" in {
+    val vec1 = SparseVector(3, Array(0, 2), Array(1, 1))
+    val vec2 = SparseVector(3, Array(1), Array(1))
+
+    vec1.outer(vec2) should be(an[SparseMatrix])
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(3, 3, (0, 1, 1), (2, 1, 
1)))
+  }
+
+  it should "calculate outer product with DenseVector correctly as 
SparseMatrix" in {
+    val vec1 = SparseVector(3, Array(0, 2), Array(1, 1))
+    val vec2 = DenseVector(Array(0, 1, 0))
+
+    vec1.outer(vec2) should be(an[SparseMatrix])
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(3, 3, (0, 1, 1), (2, 1, 
1)))
+  }
+
+  it should "calculate outer product with a DenseVector correctly as 
SparseMatrix 2" in {
+    val vec1 = SparseVector(5, Array(0, 2), Array(1, 1))
+    val vec2 = DenseVector(Array(0, 0, 1, 0, 1))
+
+    val entries = Iterable((0, 2, 1.0), (0, 4, 1.0), (2, 2, 1.0), (2, 4, 1.0))
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(5, 5, entries))
+  }
+
+  it should "calculate outer product with a SparseVector correctly as 
SparseMatrix 2" in {
+    val vec1 = SparseVector(5, Array(0, 2), Array(1, 1))
+    val vec2 = SparseVector.fromCOO(5, (2, 1), (4, 1))
+
+    val entries = Iterable((0, 2, 1.0), (0, 4, 1.0), (2, 2, 1.0), (2, 4, 1.0))
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(5, 5, entries))
+  }
+
+
+  it should s"""calculate right outer product with DenseVector
+               |with one-dimensional unit DenseVector as 
identity""".stripMargin in {
+    val vec = SparseVector(5, Array(0, 2), Array(1, 1))
+    val unit = DenseVector(1)
+
+    vec.outer(unit) should equal(SparseMatrix.fromCOO(vec.size, 1, (0, 0, 1), 
(2, 0, 1)))
+  }
+
+  it should s"""calculate right outer product with DenseVector
+               |with one-dimensional unit SparseVector as 
identity""".stripMargin in {
+    val vec = SparseVector(5, Array(0, 2), Array(1, 1))
+    val unit = SparseVector(1, Array(0), Array(1))
+
+    vec.outer(unit) should equal(SparseMatrix.fromCOO(vec.size, 1, (0, 0, 1), 
(2, 0, 1)))
+  }
+
+  it should s"""calculate left outer product for SparseVector
+               |with one-dimensional unit DenseVector as 
identity""".stripMargin in {
+    val vec = SparseVector(5, Array(0, 1, 2, 3, 4), Array(1, 2, 3, 4, 5))
+    val unit = DenseVector(1)
+
+    val entries = Iterable((0, 0, 1.0), (0, 1, 2.0), (0, 2, 3.0), (0, 3, 4.0), 
(0, 4, 5.0))
+    unit.outer(vec) should equal(SparseMatrix.fromCOO(1, vec.size, entries))
+  }
+
+  it should s"""calculate outer product with SparseVector
+               |via multiplication if both vectors are 
one-dimensional""".stripMargin in {
+    val vec1 = SparseVector.fromCOO(1, (0, 2))
+    val vec2 = SparseVector.fromCOO(1, (0, 3))
+
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3)))
+  }
+
+  it should s"""calculate outer product with DenseVector
+               |via multiplication if both vectors are 
one-dimensional""".stripMargin in {
+    val vec1 = SparseVector(1, Array(0), Array(2))
+    val vec2 = DenseVector(Array(3))
+
+    vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3)))
+  }
+
+  it should "calculate magnitude of vector" in {
+    val vec = SparseVector.fromCOO(3, (0, 1), (1, 4), (2, 8))
+
+    vec.magnitude should be(9)
+  }
+
+  it should "convert from and to Breeze vectors" in {
+    import Breeze._
+
+    val flinkVector = SparseVector.fromCOO(3, (1, 1.0), (2, 2.0))
+    val breezeVector = breeze.linalg.SparseVector(3)(1 -> 1.0, 2 -> 2.0)
+
+    // use the vector BreezeVectorConverter
+    flinkVector should equal(breezeVector.fromBreeze)
+
+    // use the sparse vector BreezeVectorConverter
+    flinkVector should 
equal(breezeVector.fromBreeze(SparseVector.sparseVectorConverter))
+
+    flinkVector.asBreeze should be(breezeVector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
new file mode 100644
index 0000000..1168d7f
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.metrics.distances
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
+import org.apache.flink.ml.math.DenseVector
+import org.scalatest.{FlatSpec, Matchers}
+
+class DistanceMetricSuite extends FlatSpec with Matchers {
+  val EPSILON = 1e-8
+
+  behavior of "Distance Measures"
+
+  it should "calculate Euclidean distance correctly" in {
+    val vec1 = DenseVector(1, 9)
+    val vec2 = DenseVector(5, 6)
+
+    EuclideanDistanceMetric().distance(vec1, vec2) should be(5)
+  }
+
+  it should "calculate square value of Euclidean distance correctly" in {
+    val vec1 = DenseVector(1, 9)
+    val vec2 = DenseVector(5, 6)
+
+    SquaredEuclideanDistanceMetric().distance(vec1, vec2) should be(25)
+  }
+
+  it should "calculate Chebyshev distance correctly" in {
+    val vec1 = DenseVector(0, 3, 6)
+    val vec2 = DenseVector(0, 0, 0)
+
+    ChebyshevDistanceMetric().distance(vec1, vec2) should be(6)
+  }
+
+  it should "calculate Cosine distance correctly" in {
+    val vec1 = DenseVector(1, 0)
+    val vec2 = DenseVector(2, 2)
+
+    CosineDistanceMetric().distance(vec1, vec2) should be((1 - math.sqrt(2) / 
2) +- EPSILON)
+  }
+
+  it should "calculate Manhattan distance correctly" in {
+    val vec1 = DenseVector(0, 0, 0, 1, 1, 1)
+    val vec2 = DenseVector(1, 1, 1, 0, 0, 0)
+
+    ManhattanDistanceMetric().distance(vec1, vec2) should be(6)
+  }
+
+  it should "calculate Minkowski distance correctly" in {
+    val vec1 = DenseVector(0, 0, 1, 1, 0)
+    val vec2 = DenseVector(1, 1, 0, 1, 2)
+
+    MinkowskiDistanceMetric(3).distance(vec1, vec2) should be(math.pow(11, 1.0 
/ 3) +- EPSILON)
+  }
+
+  it should "calculate Tanimoto distance correctly" in {
+    val vec1 = DenseVector(0, 1, 1)
+    val vec2 = DenseVector(1, 1, 0)
+
+    TanimotoDistanceMetric().distance(vec1, vec2) should be(1 - (1.0 / (2 + 2 
- 1)) +- EPSILON)
+  }
+
+  it should "be serialized" in {
+    val metric = EuclideanDistanceMetric()
+    val byteOutput = new ByteArrayOutputStream()
+    val output = new ObjectOutputStream(byteOutput)
+
+    output.writeObject(metric)
+    output.close()
+
+    val byteInput = new ByteArrayInputStream(byteOutput.toByteArray)
+    val input = new ObjectInputStream(byteInput)
+
+    val restoredMetric = input.readObject().asInstanceOf[DistanceMetric]
+
+    restoredMetric should be(an[EuclideanDistanceMetric])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
new file mode 100644
index 0000000..d84d017
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.common.{LabeledVector, WeightVector}
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.regression.RegressionData._
+import org.scalatest.{Matchers, FlatSpec}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+
+class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase 
{
+
+  // TODO(tvas): Check results again once sampling operators are in place
+
+  behavior of "The Stochastic Gradient Descent implementation"
+
+  it should "correctly solve an L1 regularized regression problem" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
+
+    val sgd = GradientDescentL1()
+      .setStepsize(0.01)
+      .setIterations(2000)
+      .setLossFunction(lossFunction)
+      .setRegularizationConstant(0.3)
+
+    val inputDS: DataSet[LabeledVector] = 
env.fromCollection(regularizationData)
+
+    val weightDS = sgd.optimize(inputDS, None)
+
+    val weightList: Seq[WeightVector] = weightDS.collect()
+
+    val weightVector: WeightVector = weightList.head
+
+    val intercept = weightVector.intercept
+    val weights = weightVector.weights.asInstanceOf[DenseVector].data
+
+    expectedRegWeights zip weights foreach {
+      case (expectedWeight, weight) =>
+        weight should be (expectedWeight +- 0.01)
+    }
+
+    intercept should be (expectedRegWeight0 +- 0.1)
+  }
+
+  it should "correctly perform one step with L2 regularization" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
+
+    val sgd = GradientDescentL2()
+      .setStepsize(0.1)
+      .setIterations(1)
+      .setLossFunction(lossFunction)
+      .setRegularizationConstant(1.0)
+
+    val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, 
DenseVector(2.0)))
+    val currentWeights = new WeightVector(DenseVector(1.0), 1.0)
+    val currentWeightsDS = env.fromElements(currentWeights)
+
+    val weightDS = sgd.optimize(inputDS, Some(currentWeightsDS))
+
+    val weightList: Seq[WeightVector] = weightDS.collect()
+
+    weightList.size should equal(1)
+
+    val WeightVector(updatedWeights, updatedIntercept) = weightList.head
+
+    updatedWeights(0) should be (0.5 +- 0.001)
+    updatedIntercept should be (0.8 +- 0.01)
+  }
+
+  it should "estimate a linear function" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
+
+    val sgd = SimpleGradientDescent()
+      .setStepsize(1.0)
+      .setIterations(800)
+      .setLossFunction(lossFunction)
+
+    val inputDS = env.fromCollection(data)
+    val weightDS = sgd.optimize(inputDS, None)
+
+    val weightList: Seq[WeightVector] = weightDS.collect()
+
+    weightList.size should equal(1)
+
+    val weightVector: WeightVector = weightList.head
+
+    val weights = weightVector.weights.asInstanceOf[DenseVector].data
+    val weight0 = weightVector.intercept
+
+    expectedWeights zip weights foreach {
+      case (expectedWeight, weight) =>
+        weight should be (expectedWeight +- 0.1)
+    }
+    weight0 should be (expectedWeight0 +- 0.1)
+  }
+
+  it should "estimate a linear function without an intercept" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
+
+    val sgd = SimpleGradientDescent()
+      .setStepsize(0.0001)
+      .setIterations(100)
+      .setLossFunction(lossFunction)
+
+    val inputDS = env.fromCollection(noInterceptData)
+    val weightDS = sgd.optimize(inputDS, None)
+
+    val weightList: Seq[WeightVector] = weightDS.collect()
+
+    weightList.size should equal(1)
+
+    val weightVector: WeightVector = weightList.head
+
+    val weights = weightVector.weights.asInstanceOf[DenseVector].data
+    val weight0 = weightVector.intercept
+
+    expectedNoInterceptWeights zip weights foreach {
+      case (expectedWeight, weight) =>
+        weight should be (expectedWeight +- 0.1)
+    }
+    weight0 should be (expectedNoInterceptWeight0 +- 0.1)
+  }
+
+  it should "correctly perform one step of the algorithm with initial weights 
provided" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
+
+    val sgd = SimpleGradientDescent()
+      .setStepsize(0.1)
+      .setIterations(1)
+      .setLossFunction(lossFunction)
+
+    val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, 
DenseVector(2.0)))
+    val currentWeights = new WeightVector(DenseVector(1.0), 1.0)
+    val currentWeightsDS = env.fromElements(currentWeights)
+
+    val weightDS = sgd.optimize(inputDS, Some(currentWeightsDS))
+
+    val weightList: Seq[WeightVector] = weightDS.collect()
+
+    weightList.size should equal(1)
+
+    val weightVector: WeightVector = weightList.head
+
+    val updatedIntercept = weightVector.intercept
+    val updatedWeight = weightVector.weights(0)
+
+    updatedWeight should be (0.6 +- 0.01)
+    updatedIntercept should be (0.8 +- 0.01)
+
+  }
+
+  it should "terminate early if the convergence criterion is reached" in {
+    // TODO(tvas): We need a better way to check the convergence of the 
weights.
+    // Ideally we want to have a Breeze-like system, where the optimizers 
carry a history and that
+    // can tell us whether we have converged and at which iteration
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
+
+    val sgdEarlyTerminate = SimpleGradientDescent()
+      .setConvergenceThreshold(1e2)
+      .setStepsize(1.0)
+      .setIterations(800)
+      .setLossFunction(lossFunction)
+
+    val inputDS = env.fromCollection(data)
+
+    val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None)
+
+    val weightListEarly: Seq[WeightVector] = weightDSEarlyTerminate.collect()
+
+    weightListEarly.size should equal(1)
+
+    val weightVectorEarly: WeightVector = weightListEarly.head
+    val weightsEarly = weightVectorEarly.weights.asInstanceOf[DenseVector].data
+    val weight0Early = weightVectorEarly.intercept
+
+    val sgdNoConvergence = SimpleGradientDescent()
+      .setStepsize(1.0)
+      .setIterations(800)
+      .setLossFunction(lossFunction)
+
+    val weightDSNoConvergence = sgdNoConvergence.optimize(inputDS, None)
+
+    val weightListNoConvergence: Seq[WeightVector] = 
weightDSNoConvergence.collect()
+
+    weightListNoConvergence.size should equal(1)
+
+    val weightVectorNoConvergence: WeightVector = weightListNoConvergence.head
+    val weightsNoConvergence = 
weightVectorNoConvergence.weights.asInstanceOf[DenseVector].data
+    val weight0NoConvergence = weightVectorNoConvergence.intercept
+
+    // Since the first optimizer was set to terminate early, its weights 
should be different
+    weightsEarly zip weightsNoConvergence foreach {
+      case (earlyWeight, weightNoConvergence) =>
+        weightNoConvergence should not be (earlyWeight +- 0.1)
+    }
+    weight0NoConvergence should not be (weight0Early +- 0.1)
+  }
+
+  // TODO: Need more corner cases, see sklearn tests for SGD linear model
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
new file mode 100644
index 0000000..4152188
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.common.{LabeledVector, WeightVector}
+import org.apache.flink.ml.math.DenseVector
+import org.scalatest.{Matchers, FlatSpec}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+
+class LossFunctionITSuite extends FlatSpec with Matchers with FlinkTestBase {
+
+  behavior of "The optimization Loss Function implementations"
+
+  it should "calculate squared loss and gradient correctly" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
+
+
+    val example = LabeledVector(1.0, DenseVector(2))
+    val weightVector = new WeightVector(DenseVector(1.0), 1.0)
+
+    val gradient = lossFunction.gradient(example, weightVector)
+    val loss = lossFunction.loss(example, weightVector)
+
+    loss should be (2.0 +- 0.001)
+
+    gradient.weights(0) should be (4.0 +- 0.001)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/PredictionFunctionITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/PredictionFunctionITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/PredictionFunctionITSuite.scala
new file mode 100644
index 0000000..6d2a239
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/PredictionFunctionITSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.common.WeightVector
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+import org.scalatest.{Matchers, FlatSpec}
+
+class PredictionFunctionITSuite extends FlatSpec with Matchers with 
FlinkTestBase {
+
+  behavior of "The optimization framework prediction functions"
+
+  it should "correctly calculate linear predictions" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val predFunction = LinearPrediction
+
+    val weightVector = new WeightVector(DenseVector(-1.0, 1.0, 0.4, -0.4, 
0.0), 1.0)
+    val features = DenseVector(1.0, 1.0, 1.0, 1.0, 1.0)
+
+    val prediction = predFunction.predict(features, weightVector)
+
+    prediction should be (1.0 +- 0.001)
+  }
+
+  it should "correctly calculate the gradient for linear predictions" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val predFunction = LinearPrediction
+
+    val weightVector = new WeightVector(DenseVector(-1.0, 1.0, 0.4, -0.4, 
0.0), 1.0)
+    val features = DenseVector(1.0, 1.0, 1.0, 1.0, 1.0)
+
+    val gradient = predFunction.gradient(features, weightVector)
+
+    gradient shouldEqual WeightVector(DenseVector(1.0, 1.0, 1.0, 1.0, 1.0), 
1.0)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
new file mode 100644
index 0000000..a3ea086
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.pipeline
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.classification.SVM
+import org.apache.flink.ml.common.{ParameterMap, LabeledVector}
+import org.apache.flink.ml.math._
+import org.apache.flink.ml.preprocessing.{PolynomialFeatures, StandardScaler}
+import org.apache.flink.ml.regression.MultipleLinearRegression
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+
+class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase {
+  behavior of "Flink's pipelines"
+
+  it should "support chaining of compatible transformer" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0))
+    val lvData = List(LabeledVector(1.0, DenseVector(1.0, 1.0, 1.0)),
+      LabeledVector(2.0, DenseVector(2.0, 2.0, 2.0)))
+
+    val vectorData = env.fromCollection(vData)
+    val labeledVectorData = env.fromCollection(lvData)
+
+    val expectedScaledVectorSet = Set(
+      DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0),
+      DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
+    )
+
+    val expectedScaledLabeledVectorSet = Set(
+      LabeledVector(1.0, DenseVector(1.0, 3.0, 5.0, 9.0, 15.0, 25.0, -1.0, 
-3.0, -5.0)),
+      LabeledVector(2.0, DenseVector(1.0, -1.0, -3.0, 1.0, 3.0, 9.0, 1.0, 
-1.0, -3.0))
+    )
+
+    val scaler = StandardScaler()
+    val polyFeatures = PolynomialFeatures().setDegree(2)
+
+    val pipeline = scaler.chainTransformer(polyFeatures)
+
+    pipeline.fit(vectorData)
+
+    val scaledVectorDataDS = pipeline.transform(vectorData)
+    val scaledLabeledVectorDataDS = pipeline.transform(labeledVectorData)
+
+    val scaledVectorData = scaledVectorDataDS.collect()
+    val scaledLabeledVectorData = scaledLabeledVectorDataDS.collect()
+
+    scaledVectorData.size should be(expectedScaledVectorSet.size)
+
+    for(scaledVector <- scaledVectorData){
+      expectedScaledVectorSet should contain(scaledVector)
+    }
+
+    scaledLabeledVectorData.size should be(expectedScaledLabeledVectorSet.size)
+
+    for(scaledLabeledVector <- scaledLabeledVectorData) {
+      expectedScaledLabeledVectorSet should contain(scaledLabeledVector)
+    }
+  }
+
+  it should "throw an exception when the pipeline operators are not 
compatible" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val scaler = StandardScaler()
+    val mlr = MultipleLinearRegression()
+
+    val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0))
+    val vectorData = env.fromCollection(vData)
+    val labeledVectors = List(LabeledVector(1.0, DenseVector(1.0, 2.0)),
+      LabeledVector(2.0, DenseVector(2.0, 3.0)),
+      LabeledVector(3.0, DenseVector(3.0, 4.0)))
+    val labeledData = env.fromCollection(labeledVectors)
+    val doubles = List(1.0, 2.0, 3.0)
+    val doubleData = env.fromCollection(doubles)
+
+    val pipeline = scaler.chainPredictor(mlr)
+
+    val exceptionFit = intercept[RuntimeException] {
+      pipeline.fit(vectorData)
+    }
+
+    exceptionFit.getMessage should equal("There is no FitOperation defined for 
org.apache." +
+      "flink.ml.regression.MultipleLinearRegression which trains on a " +
+      "DataSet[org.apache.flink.ml.math.DenseVector]")
+
+    // fit the pipeline so that the StandardScaler won't fail when predict is 
called on the pipeline
+    pipeline.fit(labeledData)
+
+    // make sure that we have TransformOperation[StandardScaler, Double, 
Double]
+    implicit val standardScalerDoubleTransform =
+      new TransformDataSetOperation[StandardScaler, Double, Double] {
+        override def transformDataSet(instance: StandardScaler, 
transformParameters: ParameterMap,
+          input: DataSet[Double]): DataSet[Double] = {
+          input
+        }
+      }
+
+    val exceptionPredict = intercept[RuntimeException] {
+      pipeline.predict(doubleData)
+    }
+
+    exceptionPredict.getMessage should equal("There is no PredictOperation 
defined for " +
+      "org.apache.flink.ml.regression.MultipleLinearRegression which takes a " 
+
+      "DataSet[Double] as input.")
+  }
+
+  it should "throw an exception when the input data is not supported" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dData = List(1.0, 2.0, 3.0)
+    val doubleData = env.fromCollection(dData)
+
+    val scaler = StandardScaler()
+    val polyFeatures = PolynomialFeatures()
+
+    val pipeline = scaler.chainTransformer(polyFeatures)
+
+    val exceptionFit = intercept[RuntimeException] {
+      pipeline.fit(doubleData)
+    }
+
+    exceptionFit.getMessage should equal("There is no FitOperation defined for 
org.apache." +
+      "flink.ml.preprocessing.StandardScaler which trains on a 
DataSet[Double]")
+
+    val exceptionTransform = intercept[RuntimeException] {
+      pipeline.transform(doubleData)
+    }
+
+    exceptionTransform.getMessage should equal("There is no TransformOperation 
defined for " +
+      "org.apache.flink.ml.preprocessing.StandardScaler which takes a 
DataSet[Double] as input.")
+  }
+
+  it should "support multiple transformers and a predictor" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val data = List(LabeledVector(1.0, DenseVector(1.0, 2.0)),
+      LabeledVector(2.0, DenseVector(2.0, 3.0)),
+      LabeledVector(3.0, DenseVector(3.0, 4.0)))
+    val testing = data.map(_.vector)
+    val evaluation = data.map(x => (x.vector, x.label))
+
+    val trainingData = env.fromCollection(data)
+    val testingData = env.fromCollection(testing)
+    val evaluationData = env.fromCollection(evaluation)
+
+    val chainedScalers2 = StandardScaler().chainTransformer(StandardScaler())
+    val chainedScalers3 = chainedScalers2.chainTransformer(StandardScaler())
+    val chainedScalers4 = chainedScalers3.chainTransformer(StandardScaler())
+    val chainedScalers5 = chainedScalers4.chainTransformer(StandardScaler())
+
+    val predictor = MultipleLinearRegression()
+    
+    val pipeline = chainedScalers5.chainPredictor(predictor)
+
+    pipeline.fit(trainingData)
+
+    val weightVector = predictor.weightsOption.get.collect().head
+
+    weightVector.weights.valueIterator.foreach{
+      _ should be (0.268050 +- 0.01)
+    }
+
+    weightVector.intercept should be (0.807924 +- 0.01)
+
+    val predictionDS = pipeline.predict(testingData)
+
+    val predictionResult = predictionDS.collect()
+
+    val evaluationDS = pipeline.evaluate(evaluationData)
+
+    val evaluationResult = evaluationDS.collect()
+
+    predictionResult.size should be(testing.size)
+    evaluationResult.size should be(evaluation.size)
+  }
+
+  it should "throw an exception when the input data is not supported by a 
predictor" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val data = List(1.0, 2.0, 3.0)
+    val doubleData = env.fromCollection(data)
+
+    val svm = SVM()
+
+    intercept[RuntimeException] {
+      svm.fit(doubleData)
+    }
+
+    intercept[RuntimeException] {
+      svm.predict(doubleData)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
new file mode 100644
index 0000000..75ac442
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg.{max, min}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{DenseVector, Vector}
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+
+class MinMaxScalerITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's MinMax Scaler"
+
+  import MinMaxScalerData._
+
+  it should "scale the vectors' values to be restricted in the [0.0,1.0] 
range" in {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(data)
+    val minMaxScaler = MinMaxScaler()
+    minMaxScaler.fit(dataSet)
+    val scaledVectors = minMaxScaler.transform(dataSet).collect
+
+    scaledVectors.length should equal(data.length)
+
+    //ensure data lies in the user-specified range
+    for (vector <- scaledVectors) {
+      val test = vector.asBreeze.forall(fv => {
+        fv >= 0.0 && fv <= 1.0
+      })
+      test shouldEqual true
+    }
+
+    var expectedMin = data.head.asBreeze
+    var expectedMax = data.head.asBreeze
+
+    for (v <- data.tail) {
+      val tempVector = v.asBreeze
+      expectedMin = min(expectedMin, tempVector)
+      expectedMax = max(expectedMax, tempVector)
+    }
+
+    //ensure that estimated Min and Max vectors equal the expected ones
+    val estimatedMinMax = minMaxScaler.metricsOption.get.collect()
+    estimatedMinMax.head shouldEqual(expectedMin, expectedMax)
+
+    //handle the case where a feature takes only one value
+    val expectedRangePerFeature = (expectedMax - expectedMin)
+    for (i <- 0 until expectedRangePerFeature.size) {
+      if (expectedRangePerFeature(i) == 0.0) {
+        expectedRangePerFeature(i)= 1.0
+      }
+    }
+
+    //ensure that vectors where scaled correctly
+    for (i <- 0 until data.length) {
+      var expectedVector = data(i).asBreeze - expectedMin
+      expectedVector :/= expectedRangePerFeature
+      expectedVector = expectedVector :* (1.0 - 0.0)
+
+      expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs 
scaledVectors(i)
+    }
+  }
+
+  it should "scale vectors' values in the [-1.0,1.0] range" in {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(labeledData)
+    val minMaxScaler = MinMaxScaler().setMin(-1.0)
+    minMaxScaler.fit(dataSet)
+    val scaledVectors = minMaxScaler.transform(dataSet).collect
+
+    scaledVectors.length should equal(labeledData.length)
+
+    //ensure data lies in the user-specified range
+    for (labeledVector <- scaledVectors) {
+      val test = labeledVector.vector.asBreeze.forall(lv => {
+        lv >= -1.0 && lv <= 1.0
+      })
+      test shouldEqual true
+    }
+
+    var expectedMin = labeledData.head.vector.asBreeze
+    var expectedMax = labeledData.head.vector.asBreeze
+
+    for (v <- labeledData.tail) {
+      val tempVector = v.vector.asBreeze
+      expectedMin = min(expectedMin, tempVector)
+      expectedMax = max(expectedMax, tempVector)
+    }
+
+    //ensure that estimated Min and Max vectors equal the expected ones
+    val estimatedMinMax = minMaxScaler.metricsOption.get.collect()
+    estimatedMinMax.head shouldEqual(expectedMin, expectedMax)
+
+    //handle the case where a feature takes only one value
+    val expectedRangePerFeature = (expectedMax - expectedMin)
+    for (i <- 0 until expectedRangePerFeature.size) {
+      if (expectedRangePerFeature(i) == 0.0) {
+        expectedRangePerFeature(i)= 1.0
+      }
+    }
+
+    //ensure that LabeledVectors where scaled correctly
+    for (i <- 0 until labeledData.length) {
+      var expectedVector = labeledData(i).vector.asBreeze - expectedMin
+      expectedVector :/= expectedRangePerFeature
+      expectedVector = (expectedVector :* (1.0 + 1.0)) - 1.0
+
+      labeledData(i).label shouldEqual scaledVectors(i).label
+      expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs 
scaledVectors(i)
+        .vector
+    }
+  }
+}
+
+
+object MinMaxScalerData {
+
+  val data: Seq[Vector] = List(
+    DenseVector(Array(2104.00, 3.00, 0.0)),
+    DenseVector(Array(1600.00, 3.00, 0.0)),
+    DenseVector(Array(2400.00, 3.00, 0.0)),
+    DenseVector(Array(1416.00, 2.00, 0.0)),
+    DenseVector(Array(3000.00, 4.00, 0.0)),
+    DenseVector(Array(1985.00, 4.00, 0.0)),
+    DenseVector(Array(1534.00, 3.00, 0.0)),
+    DenseVector(Array(1427.00, 3.00, 0.0)),
+    DenseVector(Array(1380.00, 3.00, 0.0)),
+    DenseVector(Array(1494.00, 3.00, 0.0)),
+    DenseVector(Array(1940.00, 4.00, 0.0)),
+    DenseVector(Array(2000.00, 3.00, 0.0)),
+    DenseVector(Array(1890.00, 3.00, 0.0)),
+    DenseVector(Array(4478.00, 5.00, 0.0)),
+    DenseVector(Array(1268.00, 3.00, 0.0)),
+    DenseVector(Array(2300.00, 4.00, 0.0)),
+    DenseVector(Array(1320.00, 2.00, 0.0)),
+    DenseVector(Array(1236.00, 3.00, 0.0)),
+    DenseVector(Array(2609.00, 4.00, 0.0)),
+    DenseVector(Array(3031.00, 4.00, 0.0)),
+    DenseVector(Array(1767.00, 3.00, 0.0)),
+    DenseVector(Array(1888.00, 2.00, 0.0)),
+    DenseVector(Array(1604.00, 3.00, 0.0)),
+    DenseVector(Array(1962.00, 4.00, 0.0)),
+    DenseVector(Array(3890.00, 3.00, 0.0)),
+    DenseVector(Array(1100.00, 3.00, 0.0)),
+    DenseVector(Array(1458.00, 3.00, 0.0)),
+    DenseVector(Array(2526.00, 3.00, 0.0)),
+    DenseVector(Array(2200.00, 3.00, 0.0)),
+    DenseVector(Array(2637.00, 3.00, 0.0)),
+    DenseVector(Array(1839.00, 2.00, 0.0)),
+    DenseVector(Array(1000.00, 1.00, 0.0)),
+    DenseVector(Array(2040.00, 4.00, 0.0)),
+    DenseVector(Array(3137.00, 3.00, 0.0)),
+    DenseVector(Array(1811.00, 4.00, 0.0)),
+    DenseVector(Array(1437.00, 3.00, 0.0)),
+    DenseVector(Array(1239.00, 3.00, 0.0)),
+    DenseVector(Array(2132.00, 4.00, 0.0)),
+    DenseVector(Array(4215.00, 4.00, 0.0)),
+    DenseVector(Array(2162.00, 4.00, 0.0)),
+    DenseVector(Array(1664.00, 2.00, 0.0)),
+    DenseVector(Array(2238.00, 3.00, 0.0)),
+    DenseVector(Array(2567.00, 4.00, 0.0)),
+    DenseVector(Array(1200.00, 3.00, 0.0)),
+    DenseVector(Array(852.00, 2.00, 0.0)),
+    DenseVector(Array(1852.00, 4.00, 0.0)),
+    DenseVector(Array(1203.00, 3.00, 0.0))
+  )
+
+  val labeledData: Seq[LabeledVector] = List(
+    LabeledVector(1.0, DenseVector(Array(2104.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1600.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2400.00, 3.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1416.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(3000.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1985.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1534.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1427.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1380.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1494.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1940.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2000.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1890.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(4478.00, 5.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1268.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2300.00, 4.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1320.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1236.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2609.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(3031.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1767.00, 3.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1888.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1604.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1962.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(3890.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1100.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1458.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2526.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2200.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2637.00, 3.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1839.00, 2.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1000.00, 1.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2040.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(3137.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1811.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1437.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1239.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2132.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(4215.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2162.00, 4.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1664.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2238.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2567.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1200.00, 3.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(852.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1852.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1203.00, 3.00, 0.0)))
+  )
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
new file mode 100644
index 0000000..006db5f
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+class PolynomialFeaturesITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "The polynomial base implementation"
+
+  it should "map single element vectors to the polynomial vector space" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism (2)
+
+    val input = Seq (
+    LabeledVector (1.0, DenseVector (1)),
+    LabeledVector (2.0, DenseVector (2))
+    )
+
+    val inputDS = env.fromCollection (input)
+
+    val transformer = PolynomialFeatures()
+    .setDegree (3)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val expectedMap = List (
+    (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
+    (2.0 -> DenseVector (8.0, 4.0, 2.0) )
+    ) toMap
+
+    val result = transformedDS.collect()
+
+    for (entry <- result) {
+    expectedMap.contains (entry.label) should be (true)
+    entry.vector should equal (expectedMap (entry.label) )
+    }
+  }
+
+  it should "map vectors to the polynomial vector space" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val input = Seq(
+      LabeledVector(1.0, DenseVector(2, 3)),
+      LabeledVector(2.0, DenseVector(2, 3, 4))
+    )
+
+    val expectedMap = List(
+      (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
+      (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 
64.0, 4.0, 6.0, 8.0,
+        9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
+    ) toMap
+
+    val inputDS = env.fromCollection(input)
+
+    val transformer = PolynomialFeatures()
+      .setDegree(3)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val result = transformedDS.collect()
+
+    for(entry <- result) {
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
+    }
+  }
+
+  it should "return an empty vector if the max degree is zero" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val input = Seq(
+      LabeledVector(1.0, DenseVector(2, 3)),
+      LabeledVector(2.0, DenseVector(2, 3, 4))
+    )
+
+    val inputDS = env.fromCollection(input)
+
+    val transformer = PolynomialFeatures()
+      .setDegree(0)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val result = transformedDS.collect()
+
+    val expectedMap = List(
+      (1.0 -> DenseVector()),
+      (2.0 -> DenseVector())
+    ) toMap
+
+    for(entry <- result) {
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
new file mode 100644
index 0000000..5cd253d
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import org.apache.flink.test.util.FlinkTestBase
+import org.apache.flink.ml.math.Breeze._
+import org.scalatest._
+
+
+class StandardScalerITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's Standard Scaler"
+
+  import StandardScalerData._
+
+  def checkVectors(
+      scaledVectors: Seq[FlinkVector],
+      expectedMean: Double,
+      expectedStd: Double): Unit = {
+    scaledVectors.length should equal(data.length)
+
+    val numberOfFeatures = scaledVectors(0).size
+    var scaledMean: linalg.Vector[Double] = 
linalg.DenseVector.zeros(numberOfFeatures)
+    var scaledStd: linalg.Vector[Double] = 
linalg.DenseVector.zeros(numberOfFeatures)
+
+    for (vector <- scaledVectors) {
+      scaledMean += vector.asBreeze
+    }
+    scaledMean /= scaledVectors.size.asInstanceOf[Double]
+
+    for (vector <- scaledVectors) {
+      val temp = vector.asBreeze - scaledMean
+      scaledStd += temp :* temp
+    }
+    scaledStd /= scaledVectors.size.asInstanceOf[Double]
+    scaledStd = sqrt(scaledStd)
+
+    for (i <- 0 until numberOfFeatures) {
+      scaledMean(i) should be(expectedMean +- 1e-9)
+      scaledStd(i) should be(expectedStd +- 1e-9)
+    }
+  }
+
+  it should "scale the vectors to have mean equal to 0 and std equal to 1" in {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(data)
+    val scaler = StandardScaler()
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).collect()
+
+    checkVectors(scaledVectors, 0.0, 1.0)
+  }
+
+  it should "scale the vectors to have mean equal to 10 and standard deviation 
equal to 2" in {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(data)
+    val scaler = StandardScaler().setMean(10.0).setStd(2.0)
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).collect()
+
+    checkVectors(scaledVectors, 10.0, 2.0)
+  }
+
+  it should "work with LabeledVector" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(data).map(v => LabeledVector(1.0, v))
+    val scaler = StandardScaler()
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).map(lv => 
lv.vector).collect()
+
+    checkVectors(scaledVectors, 0.0, 1.0)
+  }
+
+  it should "work with (FlinkVector, Double) tuples" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(data).map(v => (v, 1.0))
+    val scaler = StandardScaler()
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).map(vl => vl._1).collect()
+
+    checkVectors(scaledVectors, 0.0, 1.0)
+  }
+}
+
+object StandardScalerData {
+
+  val data: Seq[FlinkVector] = List(
+    DenseVector(Array(2104.00, 3.00)),
+    DenseVector(Array(1600.00, 3.00)),
+    DenseVector(Array(2400.00, 3.00)),
+    DenseVector(Array(1416.00, 2.00)),
+    DenseVector(Array(3000.00, 4.00)),
+    DenseVector(Array(1985.00, 4.00)),
+    DenseVector(Array(1534.00, 3.00)),
+    DenseVector(Array(1427.00, 3.00)),
+    DenseVector(Array(1380.00, 3.00)),
+    DenseVector(Array(1494.00, 3.00)),
+    DenseVector(Array(1940.00, 4.00)),
+    DenseVector(Array(2000.00, 3.00)),
+    DenseVector(Array(1890.00, 3.00)),
+    DenseVector(Array(4478.00, 5.00)),
+    DenseVector(Array(1268.00, 3.00)),
+    DenseVector(Array(2300.00, 4.00)),
+    DenseVector(Array(1320.00, 2.00)),
+    DenseVector(Array(1236.00, 3.00)),
+    DenseVector(Array(2609.00, 4.00)),
+    DenseVector(Array(3031.00, 4.00)),
+    DenseVector(Array(1767.00, 3.00)),
+    DenseVector(Array(1888.00, 2.00)),
+    DenseVector(Array(1604.00, 3.00)),
+    DenseVector(Array(1962.00, 4.00)),
+    DenseVector(Array(3890.00, 3.00)),
+    DenseVector(Array(1100.00, 3.00)),
+    DenseVector(Array(1458.00, 3.00)),
+    DenseVector(Array(2526.00, 3.00)),
+    DenseVector(Array(2200.00, 3.00)),
+    DenseVector(Array(2637.00, 3.00)),
+    DenseVector(Array(1839.00, 2.00)),
+    DenseVector(Array(1000.00, 1.00)),
+    DenseVector(Array(2040.00, 4.00)),
+    DenseVector(Array(3137.00, 3.00)),
+    DenseVector(Array(1811.00, 4.00)),
+    DenseVector(Array(1437.00, 3.00)),
+    DenseVector(Array(1239.00, 3.00)),
+    DenseVector(Array(2132.00, 4.00)),
+    DenseVector(Array(4215.00, 4.00)),
+    DenseVector(Array(2162.00, 4.00)),
+    DenseVector(Array(1664.00, 2.00)),
+    DenseVector(Array(2238.00, 3.00)),
+    DenseVector(Array(2567.00, 4.00)),
+    DenseVector(Array(1200.00, 3.00)),
+    DenseVector(Array(852.00, 2.00)),
+    DenseVector(Array(1852.00, 4.00)),
+    DenseVector(Array(1203.00, 3.00))
+  )
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
new file mode 100644
index 0000000..9c241fd
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.scalatest._
+
+import scala.language.postfixOps
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+class ALSITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The alternating least squares (ALS) implementation"
+
+  it should "properly factorize a matrix" in {
+    import Recommendation._
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val als = ALS()
+      .setIterations(iterations)
+      .setLambda(lambda)
+      .setBlocks(4)
+      .setNumFactors(numFactors)
+
+    val inputDS = env.fromCollection(data)
+
+    als.fit(inputDS)
+
+    val testData = env.fromCollection(expectedResult.map{
+      case (userID, itemID, rating) => (userID, itemID)
+    })
+
+    val predictions = als.predict(testData).collect()
+
+    predictions.length should equal(expectedResult.length)
+
+    val resultMap = expectedResult map {
+      case (uID, iID, value) => (uID, iID) -> value
+    } toMap
+
+    predictions foreach {
+      case (uID, iID, value) => {
+        resultMap.isDefinedAt((uID, iID)) should be(true)
+
+        value should be(resultMap((uID, iID)) +- 0.1)
+      }
+    }
+
+    val risk = als.empiricalRisk(inputDS).collect().head
+
+    risk should be(expectedEmpiricalRisk +- 1)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
new file mode 100644
index 0000000..8d8e4b9
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+object Recommendation {
+  val iterations = 9
+  val lambda = 1.0
+  val numFactors = 5
+
+  val data: Seq[(Int, Int, Double)] = {
+    Seq(
+      (2,13,534.3937734561154),
+      (6,14,509.63176469621936),
+      (4,14,515.8246770897443),
+      (7,3,495.05234565105),
+      (2,3,532.3281786219485),
+      (5,3,497.1906356844367),
+      (3,3,512.0640508585093),
+      (10,3,500.2906742233019),
+      (1,4,521.9189079662882),
+      (2,4,515.0734651491396),
+      (1,7,522.7532725967008),
+      (8,4,492.65683825096403),
+      (4,8,492.65683825096403),
+      (10,8,507.03319667905413),
+      (7,1,522.7532725967008),
+      (1,1,572.2230209271174),
+      (2,1,563.5849190220224),
+      (6,1,518.4844061038742),
+      (9,1,529.2443732217674),
+      (8,1,543.3202505434103),
+      (7,2,516.0188923307859),
+      (1,2,563.5849190220224),
+      (1,11,515.1023793011227),
+      (8,2,536.8571133978352),
+      (2,11,507.90776961762225),
+      (3,2,532.3281786219485),
+      (5,11,476.24185144363304),
+      (4,2,515.0734651491396),
+      (4,11,469.92049343738233),
+      (3,12,509.4713776280098),
+      (4,12,494.6533165132021),
+      (7,5,482.2907867916308),
+      (6,5,477.5940040923741),
+      (4,5,480.9040684364228),
+      (1,6,518.4844061038742),
+      (6,6,470.6605085832807),
+      (8,6,489.6360564705307),
+      (4,6,472.74052954447046),
+      (7,9,482.5837650471611),
+      (5,9,487.00175463269863),
+      (9,9,500.69514584780944),
+      (4,9,477.71644808419325),
+      (7,10,485.3852917539852),
+      (8,10,507.03319667905413),
+      (3,10,500.2906742233019),
+      (5,15,488.08215944254437),
+      (6,15,480.16929757607346)
+    )
+  }
+
+  val expectedResult: Seq[(Int, Int, Double)] = {
+    Seq(
+      (2, 2, 526.1037),
+      (5, 9, 468.5680),
+      (10, 3, 484.8975),
+      (5, 13, 451.6228),
+      (1, 15, 493.4956),
+      (4, 11, 456.3862)
+    )
+  }
+
+  val expectedEmpiricalRisk = 505374.1877
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
new file mode 100644
index 0000000..17b8a85
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.regression
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{ParameterMap, WeightVector}
+import org.apache.flink.ml.preprocessing.PolynomialFeatures
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+class MultipleLinearRegressionITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "The multipe linear regression implementation"
+
+  it should "estimate a linear function" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val mlr = MultipleLinearRegression()
+
+    import RegressionData._
+
+    val parameters = ParameterMap()
+
+    parameters.add(MultipleLinearRegression.Stepsize, 2.0)
+    parameters.add(MultipleLinearRegression.Iterations, 10)
+    parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+    val inputDS = env.fromCollection(data)
+    mlr.fit(inputDS, parameters)
+
+    val weightList = mlr.weightsOption.get.collect()
+
+    weightList.size should equal(1)
+
+    val WeightVector(weights, intercept) = weightList.head
+
+    expectedWeights.toIterator zip weights.valueIterator foreach {
+      case (expectedWeight, weight) =>
+        weight should be (expectedWeight +- 1)
+    }
+    intercept should be (expectedWeight0 +- 0.4)
+
+    val srs = mlr.squaredResidualSum(inputDS).collect().head
+
+    srs should be (expectedSquaredResidualSum +- 2)
+  }
+
+  it should "estimate a cubic function" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val polynomialBase = PolynomialFeatures()
+    val mlr = MultipleLinearRegression()
+
+    val pipeline = polynomialBase.chainPredictor(mlr)
+
+    val inputDS = env.fromCollection(RegressionData.polynomialData)
+
+    val parameters = ParameterMap()
+      .add(PolynomialFeatures.Degree, 3)
+      .add(MultipleLinearRegression.Stepsize, 0.004)
+      .add(MultipleLinearRegression.Iterations, 100)
+
+    pipeline.fit(inputDS, parameters)
+
+    val weightList = mlr.weightsOption.get.collect()
+
+    weightList.size should equal(1)
+
+    val WeightVector(weights, intercept) = weightList.head
+
+    
RegressionData.expectedPolynomialWeights.toIterator.zip(weights.valueIterator) 
foreach {
+      case (expectedWeight, weight) =>
+        weight should be(expectedWeight +- 0.1)
+    }
+
+    intercept should be(RegressionData.expectedPolynomialWeight0 +- 0.1)
+
+    val transformedInput = polynomialBase.transform(inputDS, parameters)
+
+    val srs = mlr.squaredResidualSum(transformedInput).collect().head
+
+    srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
+  }
+
+  it should "make (mostly) correct predictions" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val mlr = MultipleLinearRegression()
+
+    import RegressionData._
+
+    val parameters = ParameterMap()
+
+    parameters.add(MultipleLinearRegression.Stepsize, 1.0)
+    parameters.add(MultipleLinearRegression.Iterations, 10)
+    parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+    val inputDS = env.fromCollection(data)
+    val evaluationDS = inputDS.map(x => (x.vector, x.label))
+
+    mlr.fit(inputDS, parameters)
+
+    val predictionPairs = mlr.evaluate(evaluationDS)
+
+    val absoluteErrorSum = predictionPairs.collect().map{
+      case (truth, prediction) => Math.abs(truth - prediction)}.sum
+
+    absoluteErrorSum should be < 50.0
+  }
+}

Reply via email to