http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala new file mode 100644 index 0000000..aa1c4f9 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +import org.scalatest.{Matchers, FlatSpec} + +class DenseVectorSuite extends FlatSpec with Matchers { + + behavior of "Flink's DenseVector" + + it should "contain the initialization data" in { + val data = Array.range(1, 10) + + val vector = DenseVector(data) + + assertResult(data.length)(vector.size) + + data.zip(vector.map(_._2)).foreach { case (expected, actual) => assertResult(expected)(actual) } + } + + it should "fail in case of an illegal element access" in { + val size = 10 + + val vector = DenseVector.zeros(size) + + intercept[IllegalArgumentException] { + vector(-1) + } + + intercept[IllegalArgumentException] { + vector(size) + } + } + + it should "calculate dot product with DenseVector" in { + val vec1 = DenseVector(Array(1, 0, 1)) + val vec2 = DenseVector(Array(0, 1, 0)) + + vec1.dot(vec2) should be(0) + } + + it should "calculate dot product with SparseVector" in { + val vec1 = DenseVector(Array(1, 0, 1)) + val vec2 = SparseVector.fromCOO(3, (0, 1), (1, 1)) + + vec1.dot(vec2) should be(1) + } + + it should "calculate dot product with SparseVector 2" in { + val vec1 = DenseVector(Array(1, 0, 1, 0, 0)) + val vec2 = SparseVector.fromCOO(5, (2, 1), (4, 1)) + + vec1.dot(vec2) should be(1) + } + + it should "fail in case of calculation dot product with different size vector" in { + val vec1 = DenseVector(Array(1, 0)) + val vec2 = DenseVector(Array(0)) + + intercept[IllegalArgumentException] { + vec1.dot(vec2) + } + } + + it should "calculate outer product with DenseVector correctly as DenseMatrix" in { + val vec1 = DenseVector(Array(1, 0, 1)) + val vec2 = DenseVector(Array(0, 1, 0)) + + vec1.outer(vec2) should be(an[DenseMatrix]) + vec1.outer(vec2) should be(DenseMatrix(3, 3, Array(0, 1, 0, 0, 0, 0, 0, 1, 0))) + } + + it should "calculate outer product with SparseVector correctly as SparseMatrix" in { + val vec1 = DenseVector(Array(1, 0, 1)) + val vec2 = SparseVector(3, Array(1), Array(1)) + + vec1.outer(vec2) should be(an[SparseMatrix]) + vec1.outer(vec2) should be(SparseMatrix.fromCOO(3, 3, (0, 1, 1), (2, 1, 1))) + } + + it should "calculate outer product with a DenseVector correctly as DenseMatrix 2" in { + val vec1 = DenseVector(Array(1, 0, 1, 0, 0)) + val vec2 = DenseVector(Array(0, 0, 1, 0, 1)) + + val values = Array(0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + vec1.outer(vec2) should be(DenseMatrix(5, 5, values)) + } + + it should "calculate outer product with a SparseVector correctly as SparseMatrix 2" in { + val vec1 = DenseVector(Array(1, 0, 1, 0, 0)) + val vec2 = SparseVector.fromCOO(5, (2, 1), (4, 1)) + + val entries = Iterable((0, 2, 1.0), (0, 4, 1.0), (2, 2, 1.0), (2, 4, 1.0)) + vec1.outer(vec2) should be(SparseMatrix.fromCOO(5, 5, entries)) + } + + + + it should s"""calculate right outer product with DenseVector + |with one-dimensional unit DenseVector as identity""".stripMargin in { + val vec = DenseVector(Array(1, 0, 1, 0, 0)) + val unit = DenseVector(1) + + vec.outer(unit) should equal(DenseMatrix(vec.size, 1, vec.data)) + } + + it should s"""calculate right outer product with DenseVector + |with one-dimensional unit SparseVector as identity""".stripMargin in { + val vec = DenseVector(Array(1, 0, 1, 0, 0)) + val unit = SparseVector(1, Array(0), Array(1)) + + vec.outer(unit) should equal(SparseMatrix.fromCOO(vec.size, 1, (0, 0, 1), (2, 0, 1))) + } + + it should s"""calculate left outer product for DenseVector + |with one-dimensional unit DenseVector as identity""".stripMargin in { + val vec = DenseVector(Array(1, 2, 3, 4, 5)) + val unit = DenseVector(1) + + unit.outer(vec) should equal(DenseMatrix(1, vec.size, vec.data)) + } + + it should s"""calculate left outer product for SparseVector + |with one-dimensional unit DenseVector as identity""".stripMargin in { + val vec = SparseVector(5, Array(0, 1, 2, 3, 4), Array(1, 2, 3, 4, 5)) + val unit = DenseVector(1) + + val entries = Iterable((0, 0, 1.0), (0, 1, 2.0), (0, 2, 3.0), (0, 3, 4.0), (0, 4, 5.0)) + unit.outer(vec) should equal(SparseMatrix.fromCOO(1, vec.size, entries)) + } + + it should s"""calculate outer product with DenseVector + |via multiplication if both vectors are one-dimensional""".stripMargin in { + val vec1 = DenseVector(Array(2)) + val vec2 = DenseVector(Array(3)) + + vec1.outer(vec2) should be(DenseMatrix(1, 1, 2 * 3)) + } + + it should s"""calculate outer product with SparseVector + |via multiplication if both vectors are one-dimensional""".stripMargin in { + val vec1 = DenseVector(Array(2)) + val vec2 = SparseVector(1, Array(0), Array(3)) + + vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3))) + } + + it should "calculate outer product with DenseVector via multiplication if both vectors " + + "are one-dimensional" in { + val vec1 = DenseVector(Array(2)) + val vec2 = DenseVector(Array(3)) + + vec1.outer(vec2) should be(DenseMatrix(1, 1, 2 * 3)) + } + + it should "calculate outer product with SparseVector via multiplication if both vectors are " + + "one-dimensioan" in { + val vec1 = DenseVector(Array(2)) + val vec2 = SparseVector(1, Array(0), Array(3)) + + vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3))) + } + + it should "calculate magnitude of vector" in { + val vec = DenseVector(Array(1, 4, 8)) + + vec.magnitude should be(9) + } + + it should "convert from and to Breeze vector" in { + import Breeze._ + + val flinkVector = DenseVector(1, 2, 3) + val breezeVector = breeze.linalg.DenseVector.apply(1.0, 2.0, 3.0) + + // use the vector BreezeVectorConverter + flinkVector should equal(breezeVector.fromBreeze) + + // use the sparse vector BreezeVectorConverter + flinkVector should equal(breezeVector.fromBreeze(DenseVector.denseVectorConverter)) + + flinkVector.asBreeze should be(breezeVector) + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala new file mode 100644 index 0000000..970ea4b --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseMatrixSuite.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +import org.scalatest.{Matchers, FlatSpec} + +class SparseMatrixSuite extends FlatSpec with Matchers { + + behavior of "Flink's SparseMatrix" + + it should "contain a single element provided as a coordinate list (COO)" in { + val sparseMatrix = SparseMatrix.fromCOO(4, 4, (0, 0, 1)) + + sparseMatrix(0, 0) should equal(1) + + for(i <- 1 until sparseMatrix.size) { + val row = i / sparseMatrix.numCols + val col = i % sparseMatrix.numCols + + sparseMatrix(row, col) should equal(0) + } + } + + it should "be initialized from a coordinate list representation (COO)" in { + val data = List[(Int, Int, Double)]((0, 0, 0), (0, 1, 0), (3, 4, 43), (2, 1, 17), + (3, 3, 88), (4 , 2, 99), (1, 4, 91), (3, 4, -1)) + + val numRows = 5 + val numCols = 5 + + val sparseMatrix = SparseMatrix.fromCOO(numRows, numCols, data) + + val expectedSparseMatrix = SparseMatrix.fromCOO(5, 5, (3, 4, 42), (2, 1, 17), (3, 3, 88), + (4, 2, 99), (1, 4, 91), (0, 0, 0), (0, 1, 0)) + + val expectedDenseMatrix = DenseMatrix.zeros(5, 5) + expectedDenseMatrix(3, 4) = 42 + expectedDenseMatrix(2, 1) = 17 + expectedDenseMatrix(3, 3) = 88 + expectedDenseMatrix(4, 2) = 99 + expectedDenseMatrix(1, 4) = 91 + + sparseMatrix should equal(expectedSparseMatrix) + sparseMatrix.equalsMatrix(expectedDenseMatrix) should be(true) + + sparseMatrix.toDenseMatrix.data.sameElements(expectedDenseMatrix.data) should be(true) + + val dataMap = data. + map{ case (row, col, value) => (row, col) -> value }. + groupBy{_._1}. + mapValues{ + entries => + entries.map(_._2).sum + } + + for(row <- 0 until numRows; col <- 0 until numCols) { + sparseMatrix(row, col) should be(dataMap.getOrElse((row, col), 0)) + } + + // test access to defined field even though it was set to 0 + sparseMatrix(0, 1) = 10 + + // test that a non-defined field is not accessible + intercept[IllegalArgumentException]{ + sparseMatrix(1, 1) = 1 + } + } + + it should "fail when accessing zero elements or using invalid indices" in { + val data = List[(Int, Int, Double)]((0, 0, 0), (0, 1, 0), (3, 4, 43), (2, 1, 17), + (3, 3, 88), (4 , 2, 99), (1, 4, 91), (3, 4, -1)) + + val numRows = 5 + val numCols = 5 + + val sparseMatrix = SparseMatrix.fromCOO(numRows, numCols, data) + + intercept[IllegalArgumentException] { + sparseMatrix(-1, 4) + } + + intercept[IllegalArgumentException] { + sparseMatrix(numRows, 0) + } + + intercept[IllegalArgumentException] { + sparseMatrix(0, numCols) + } + + intercept[IllegalArgumentException] { + sparseMatrix(3, -1) + } + } + + it should "fail when elements of the COO list have invalid indices" in { + intercept[IllegalArgumentException]{ + val sparseMatrix = SparseMatrix.fromCOO(5 ,5, (5, 0, 10), (0, 0, 0), (0, 1, 0), (3, 4, 43), + (2, 1, 17)) + } + + intercept[IllegalArgumentException]{ + val sparseMatrix = SparseMatrix.fromCOO(5, 5, (0, 0, 0), (0, 1, 0), (3, 4, 43), (2, 1, 17), + (-1, 4, 20)) + } + } + + it should "be copyable" in { + val sparseMatrix = SparseMatrix.fromCOO(4, 4, (0, 1, 2), (2, 3, 1), (2, 0, 42), (1, 3, 3)) + + val copy = sparseMatrix.copy + + sparseMatrix should equal(copy) + + copy(2, 3) = 2 + + sparseMatrix should not equal copy + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala new file mode 100644 index 0000000..15eed20 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/SparseVectorSuite.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.math + +import org.scalatest.{Matchers, FlatSpec} + +class SparseVectorSuite extends FlatSpec with Matchers { + + behavior of "Flink's SparseVector" + + it should "contain a single element provided as coordinate list (COO)" in { + val sparseVector = SparseVector.fromCOO(3, (0, 1)) + + sparseVector(0) should equal(1) + + for (index <- 1 until 3) { + sparseVector(index) should equal(0) + } + } + + it should "contain the initialization data provided as coordinate list (COO)" in { + val data = List[(Int, Double)]((0, 1), (2, 0), (4, 42), (0, 3)) + val size = 5 + val sparseVector = SparseVector.fromCOO(size, data) + + val expectedSparseVector = SparseVector.fromCOO(5, (0, 4), (4, 42), (2, 0)) + val expectedDenseVector = DenseVector.zeros(5) + + expectedDenseVector(0) = 4 + expectedDenseVector(4) = 42 + + sparseVector should equal(expectedSparseVector) + sparseVector.equalsVector(expectedDenseVector) should be(true) + + val denseVector = sparseVector.toDenseVector + + denseVector should equal(expectedDenseVector) + + val dataMap = data. + groupBy { + _._1 + }. + mapValues { + entries => + entries.map(_._2).sum + } + + for (index <- 0 until size) { + sparseVector(index) should be(dataMap.getOrElse(index, 0)) + } + } + + it should "fail when accessing elements using an invalid index" in { + val sparseVector = SparseVector.fromCOO(5, (1, 1), (3, 3), (4, 4)) + + intercept[IllegalArgumentException] { + sparseVector(-1) + } + + intercept[IllegalArgumentException] { + sparseVector(5) + } + } + + it should "fail when the COO list contains elements with invalid indices" in { + intercept[IllegalArgumentException] { + val sparseVector = SparseVector.fromCOO(5, (0, 1), (-1, 34), (3, 2)) + } + + intercept[IllegalArgumentException] { + val sparseVector = SparseVector.fromCOO(5, (0, 1), (4, 3), (5, 1)) + } + } + + it should "be copyable" in { + val sparseVector = SparseVector.fromCOO(5, (0, 1), (4, 3), (3, 2)) + + val copy = sparseVector.copy + + sparseVector should equal(copy) + + copy(3) = 3 + + sparseVector should not equal (copy) + } + + it should "calculate dot product with SparseVector" in { + val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1)) + val vec2 = SparseVector.fromCOO(4, (1, 1), (3, 1)) + + vec1.dot(vec2) should be(0) + } + + it should "calculate dot product with SparseVector 2" in { + val vec1 = SparseVector.fromCOO(5, (2, 3), (4, 1)) + val vec2 = SparseVector.fromCOO(5, (4, 2), (2, 1)) + + vec1.dot(vec2) should be(5) + } + + it should "calculate dot product with DenseVector" in { + val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1)) + val vec2 = DenseVector(Array(0, 1, 0, 1)) + + vec1.dot(vec2) should be(0) + } + + it should "fail in case of calculation dot product with different size vector" in { + val vec1 = SparseVector.fromCOO(4, (0, 1), (2, 1)) + val vec2 = DenseVector(Array(0, 1, 0)) + + intercept[IllegalArgumentException] { + vec1.dot(vec2) + } + } + + it should "calculate outer product with SparseVector correctly as SparseMatrix" in { + val vec1 = SparseVector(3, Array(0, 2), Array(1, 1)) + val vec2 = SparseVector(3, Array(1), Array(1)) + + vec1.outer(vec2) should be(an[SparseMatrix]) + vec1.outer(vec2) should be(SparseMatrix.fromCOO(3, 3, (0, 1, 1), (2, 1, 1))) + } + + it should "calculate outer product with DenseVector correctly as SparseMatrix" in { + val vec1 = SparseVector(3, Array(0, 2), Array(1, 1)) + val vec2 = DenseVector(Array(0, 1, 0)) + + vec1.outer(vec2) should be(an[SparseMatrix]) + vec1.outer(vec2) should be(SparseMatrix.fromCOO(3, 3, (0, 1, 1), (2, 1, 1))) + } + + it should "calculate outer product with a DenseVector correctly as SparseMatrix 2" in { + val vec1 = SparseVector(5, Array(0, 2), Array(1, 1)) + val vec2 = DenseVector(Array(0, 0, 1, 0, 1)) + + val entries = Iterable((0, 2, 1.0), (0, 4, 1.0), (2, 2, 1.0), (2, 4, 1.0)) + vec1.outer(vec2) should be(SparseMatrix.fromCOO(5, 5, entries)) + } + + it should "calculate outer product with a SparseVector correctly as SparseMatrix 2" in { + val vec1 = SparseVector(5, Array(0, 2), Array(1, 1)) + val vec2 = SparseVector.fromCOO(5, (2, 1), (4, 1)) + + val entries = Iterable((0, 2, 1.0), (0, 4, 1.0), (2, 2, 1.0), (2, 4, 1.0)) + vec1.outer(vec2) should be(SparseMatrix.fromCOO(5, 5, entries)) + } + + + it should s"""calculate right outer product with DenseVector + |with one-dimensional unit DenseVector as identity""".stripMargin in { + val vec = SparseVector(5, Array(0, 2), Array(1, 1)) + val unit = DenseVector(1) + + vec.outer(unit) should equal(SparseMatrix.fromCOO(vec.size, 1, (0, 0, 1), (2, 0, 1))) + } + + it should s"""calculate right outer product with DenseVector + |with one-dimensional unit SparseVector as identity""".stripMargin in { + val vec = SparseVector(5, Array(0, 2), Array(1, 1)) + val unit = SparseVector(1, Array(0), Array(1)) + + vec.outer(unit) should equal(SparseMatrix.fromCOO(vec.size, 1, (0, 0, 1), (2, 0, 1))) + } + + it should s"""calculate left outer product for SparseVector + |with one-dimensional unit DenseVector as identity""".stripMargin in { + val vec = SparseVector(5, Array(0, 1, 2, 3, 4), Array(1, 2, 3, 4, 5)) + val unit = DenseVector(1) + + val entries = Iterable((0, 0, 1.0), (0, 1, 2.0), (0, 2, 3.0), (0, 3, 4.0), (0, 4, 5.0)) + unit.outer(vec) should equal(SparseMatrix.fromCOO(1, vec.size, entries)) + } + + it should s"""calculate outer product with SparseVector + |via multiplication if both vectors are one-dimensional""".stripMargin in { + val vec1 = SparseVector.fromCOO(1, (0, 2)) + val vec2 = SparseVector.fromCOO(1, (0, 3)) + + vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3))) + } + + it should s"""calculate outer product with DenseVector + |via multiplication if both vectors are one-dimensional""".stripMargin in { + val vec1 = SparseVector(1, Array(0), Array(2)) + val vec2 = DenseVector(Array(3)) + + vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3))) + } + + it should "calculate magnitude of vector" in { + val vec = SparseVector.fromCOO(3, (0, 1), (1, 4), (2, 8)) + + vec.magnitude should be(9) + } + + it should "convert from and to Breeze vectors" in { + import Breeze._ + + val flinkVector = SparseVector.fromCOO(3, (1, 1.0), (2, 2.0)) + val breezeVector = breeze.linalg.SparseVector(3)(1 -> 1.0, 2 -> 2.0) + + // use the vector BreezeVectorConverter + flinkVector should equal(breezeVector.fromBreeze) + + // use the sparse vector BreezeVectorConverter + flinkVector should equal(breezeVector.fromBreeze(SparseVector.sparseVectorConverter)) + + flinkVector.asBreeze should be(breezeVector) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala new file mode 100644 index 0000000..1168d7f --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/metrics/distances/DistanceMetricSuite.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.metrics.distances + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + +import org.apache.flink.ml.math.DenseVector +import org.scalatest.{FlatSpec, Matchers} + +class DistanceMetricSuite extends FlatSpec with Matchers { + val EPSILON = 1e-8 + + behavior of "Distance Measures" + + it should "calculate Euclidean distance correctly" in { + val vec1 = DenseVector(1, 9) + val vec2 = DenseVector(5, 6) + + EuclideanDistanceMetric().distance(vec1, vec2) should be(5) + } + + it should "calculate square value of Euclidean distance correctly" in { + val vec1 = DenseVector(1, 9) + val vec2 = DenseVector(5, 6) + + SquaredEuclideanDistanceMetric().distance(vec1, vec2) should be(25) + } + + it should "calculate Chebyshev distance correctly" in { + val vec1 = DenseVector(0, 3, 6) + val vec2 = DenseVector(0, 0, 0) + + ChebyshevDistanceMetric().distance(vec1, vec2) should be(6) + } + + it should "calculate Cosine distance correctly" in { + val vec1 = DenseVector(1, 0) + val vec2 = DenseVector(2, 2) + + CosineDistanceMetric().distance(vec1, vec2) should be((1 - math.sqrt(2) / 2) +- EPSILON) + } + + it should "calculate Manhattan distance correctly" in { + val vec1 = DenseVector(0, 0, 0, 1, 1, 1) + val vec2 = DenseVector(1, 1, 1, 0, 0, 0) + + ManhattanDistanceMetric().distance(vec1, vec2) should be(6) + } + + it should "calculate Minkowski distance correctly" in { + val vec1 = DenseVector(0, 0, 1, 1, 0) + val vec2 = DenseVector(1, 1, 0, 1, 2) + + MinkowskiDistanceMetric(3).distance(vec1, vec2) should be(math.pow(11, 1.0 / 3) +- EPSILON) + } + + it should "calculate Tanimoto distance correctly" in { + val vec1 = DenseVector(0, 1, 1) + val vec2 = DenseVector(1, 1, 0) + + TanimotoDistanceMetric().distance(vec1, vec2) should be(1 - (1.0 / (2 + 2 - 1)) +- EPSILON) + } + + it should "be serialized" in { + val metric = EuclideanDistanceMetric() + val byteOutput = new ByteArrayOutputStream() + val output = new ObjectOutputStream(byteOutput) + + output.writeObject(metric) + output.close() + + val byteInput = new ByteArrayInputStream(byteOutput.toByteArray) + val input = new ObjectInputStream(byteInput) + + val restoredMetric = input.readObject().asInstanceOf[DistanceMetric] + + restoredMetric should be(an[EuclideanDistanceMetric]) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala new file mode 100644 index 0000000..d84d017 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.common.{LabeledVector, WeightVector} +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.regression.RegressionData._ +import org.scalatest.{Matchers, FlatSpec} + +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase + + +class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { + + // TODO(tvas): Check results again once sampling operators are in place + + behavior of "The Stochastic Gradient Descent implementation" + + it should "correctly solve an L1 regularized regression problem" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) + + val sgd = GradientDescentL1() + .setStepsize(0.01) + .setIterations(2000) + .setLossFunction(lossFunction) + .setRegularizationConstant(0.3) + + val inputDS: DataSet[LabeledVector] = env.fromCollection(regularizationData) + + val weightDS = sgd.optimize(inputDS, None) + + val weightList: Seq[WeightVector] = weightDS.collect() + + val weightVector: WeightVector = weightList.head + + val intercept = weightVector.intercept + val weights = weightVector.weights.asInstanceOf[DenseVector].data + + expectedRegWeights zip weights foreach { + case (expectedWeight, weight) => + weight should be (expectedWeight +- 0.01) + } + + intercept should be (expectedRegWeight0 +- 0.1) + } + + it should "correctly perform one step with L2 regularization" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) + + val sgd = GradientDescentL2() + .setStepsize(0.1) + .setIterations(1) + .setLossFunction(lossFunction) + .setRegularizationConstant(1.0) + + val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, DenseVector(2.0))) + val currentWeights = new WeightVector(DenseVector(1.0), 1.0) + val currentWeightsDS = env.fromElements(currentWeights) + + val weightDS = sgd.optimize(inputDS, Some(currentWeightsDS)) + + val weightList: Seq[WeightVector] = weightDS.collect() + + weightList.size should equal(1) + + val WeightVector(updatedWeights, updatedIntercept) = weightList.head + + updatedWeights(0) should be (0.5 +- 0.001) + updatedIntercept should be (0.8 +- 0.01) + } + + it should "estimate a linear function" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) + + val sgd = SimpleGradientDescent() + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(lossFunction) + + val inputDS = env.fromCollection(data) + val weightDS = sgd.optimize(inputDS, None) + + val weightList: Seq[WeightVector] = weightDS.collect() + + weightList.size should equal(1) + + val weightVector: WeightVector = weightList.head + + val weights = weightVector.weights.asInstanceOf[DenseVector].data + val weight0 = weightVector.intercept + + expectedWeights zip weights foreach { + case (expectedWeight, weight) => + weight should be (expectedWeight +- 0.1) + } + weight0 should be (expectedWeight0 +- 0.1) + } + + it should "estimate a linear function without an intercept" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) + + val sgd = SimpleGradientDescent() + .setStepsize(0.0001) + .setIterations(100) + .setLossFunction(lossFunction) + + val inputDS = env.fromCollection(noInterceptData) + val weightDS = sgd.optimize(inputDS, None) + + val weightList: Seq[WeightVector] = weightDS.collect() + + weightList.size should equal(1) + + val weightVector: WeightVector = weightList.head + + val weights = weightVector.weights.asInstanceOf[DenseVector].data + val weight0 = weightVector.intercept + + expectedNoInterceptWeights zip weights foreach { + case (expectedWeight, weight) => + weight should be (expectedWeight +- 0.1) + } + weight0 should be (expectedNoInterceptWeight0 +- 0.1) + } + + it should "correctly perform one step of the algorithm with initial weights provided" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) + + val sgd = SimpleGradientDescent() + .setStepsize(0.1) + .setIterations(1) + .setLossFunction(lossFunction) + + val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, DenseVector(2.0))) + val currentWeights = new WeightVector(DenseVector(1.0), 1.0) + val currentWeightsDS = env.fromElements(currentWeights) + + val weightDS = sgd.optimize(inputDS, Some(currentWeightsDS)) + + val weightList: Seq[WeightVector] = weightDS.collect() + + weightList.size should equal(1) + + val weightVector: WeightVector = weightList.head + + val updatedIntercept = weightVector.intercept + val updatedWeight = weightVector.weights(0) + + updatedWeight should be (0.6 +- 0.01) + updatedIntercept should be (0.8 +- 0.01) + + } + + it should "terminate early if the convergence criterion is reached" in { + // TODO(tvas): We need a better way to check the convergence of the weights. + // Ideally we want to have a Breeze-like system, where the optimizers carry a history and that + // can tell us whether we have converged and at which iteration + + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) + + val sgdEarlyTerminate = SimpleGradientDescent() + .setConvergenceThreshold(1e2) + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(lossFunction) + + val inputDS = env.fromCollection(data) + + val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None) + + val weightListEarly: Seq[WeightVector] = weightDSEarlyTerminate.collect() + + weightListEarly.size should equal(1) + + val weightVectorEarly: WeightVector = weightListEarly.head + val weightsEarly = weightVectorEarly.weights.asInstanceOf[DenseVector].data + val weight0Early = weightVectorEarly.intercept + + val sgdNoConvergence = SimpleGradientDescent() + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(lossFunction) + + val weightDSNoConvergence = sgdNoConvergence.optimize(inputDS, None) + + val weightListNoConvergence: Seq[WeightVector] = weightDSNoConvergence.collect() + + weightListNoConvergence.size should equal(1) + + val weightVectorNoConvergence: WeightVector = weightListNoConvergence.head + val weightsNoConvergence = weightVectorNoConvergence.weights.asInstanceOf[DenseVector].data + val weight0NoConvergence = weightVectorNoConvergence.intercept + + // Since the first optimizer was set to terminate early, its weights should be different + weightsEarly zip weightsNoConvergence foreach { + case (earlyWeight, weightNoConvergence) => + weightNoConvergence should not be (earlyWeight +- 0.1) + } + weight0NoConvergence should not be (weight0Early +- 0.1) + } + + // TODO: Need more corner cases, see sklearn tests for SGD linear model + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala new file mode 100644 index 0000000..4152188 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.common.{LabeledVector, WeightVector} +import org.apache.flink.ml.math.DenseVector +import org.scalatest.{Matchers, FlatSpec} + +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase + + +class LossFunctionITSuite extends FlatSpec with Matchers with FlinkTestBase { + + behavior of "The optimization Loss Function implementations" + + it should "calculate squared loss and gradient correctly" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction) + + + val example = LabeledVector(1.0, DenseVector(2)) + val weightVector = new WeightVector(DenseVector(1.0), 1.0) + + val gradient = lossFunction.gradient(example, weightVector) + val loss = lossFunction.loss(example, weightVector) + + loss should be (2.0 +- 0.001) + + gradient.weights(0) should be (4.0 +- 0.001) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/PredictionFunctionITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/PredictionFunctionITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/PredictionFunctionITSuite.scala new file mode 100644 index 0000000..6d2a239 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/PredictionFunctionITSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.common.WeightVector +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase + +import org.scalatest.{Matchers, FlatSpec} + +class PredictionFunctionITSuite extends FlatSpec with Matchers with FlinkTestBase { + + behavior of "The optimization framework prediction functions" + + it should "correctly calculate linear predictions" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val predFunction = LinearPrediction + + val weightVector = new WeightVector(DenseVector(-1.0, 1.0, 0.4, -0.4, 0.0), 1.0) + val features = DenseVector(1.0, 1.0, 1.0, 1.0, 1.0) + + val prediction = predFunction.predict(features, weightVector) + + prediction should be (1.0 +- 0.001) + } + + it should "correctly calculate the gradient for linear predictions" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val predFunction = LinearPrediction + + val weightVector = new WeightVector(DenseVector(-1.0, 1.0, 0.4, -0.4, 0.0), 1.0) + val features = DenseVector(1.0, 1.0, 1.0, 1.0, 1.0) + + val gradient = predFunction.gradient(features, weightVector) + + gradient shouldEqual WeightVector(DenseVector(1.0, 1.0, 1.0, 1.0, 1.0), 1.0) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala new file mode 100644 index 0000000..a3ea086 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.pipeline + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.classification.SVM +import org.apache.flink.ml.common.{ParameterMap, LabeledVector} +import org.apache.flink.ml.math._ +import org.apache.flink.ml.preprocessing.{PolynomialFeatures, StandardScaler} +import org.apache.flink.ml.regression.MultipleLinearRegression +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{Matchers, FlatSpec} + +class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase { + behavior of "Flink's pipelines" + + it should "support chaining of compatible transformer" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0)) + val lvData = List(LabeledVector(1.0, DenseVector(1.0, 1.0, 1.0)), + LabeledVector(2.0, DenseVector(2.0, 2.0, 2.0))) + + val vectorData = env.fromCollection(vData) + val labeledVectorData = env.fromCollection(lvData) + + val expectedScaledVectorSet = Set( + DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0), + DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) + ) + + val expectedScaledLabeledVectorSet = Set( + LabeledVector(1.0, DenseVector(1.0, 3.0, 5.0, 9.0, 15.0, 25.0, -1.0, -3.0, -5.0)), + LabeledVector(2.0, DenseVector(1.0, -1.0, -3.0, 1.0, 3.0, 9.0, 1.0, -1.0, -3.0)) + ) + + val scaler = StandardScaler() + val polyFeatures = PolynomialFeatures().setDegree(2) + + val pipeline = scaler.chainTransformer(polyFeatures) + + pipeline.fit(vectorData) + + val scaledVectorDataDS = pipeline.transform(vectorData) + val scaledLabeledVectorDataDS = pipeline.transform(labeledVectorData) + + val scaledVectorData = scaledVectorDataDS.collect() + val scaledLabeledVectorData = scaledLabeledVectorDataDS.collect() + + scaledVectorData.size should be(expectedScaledVectorSet.size) + + for(scaledVector <- scaledVectorData){ + expectedScaledVectorSet should contain(scaledVector) + } + + scaledLabeledVectorData.size should be(expectedScaledLabeledVectorSet.size) + + for(scaledLabeledVector <- scaledLabeledVectorData) { + expectedScaledLabeledVectorSet should contain(scaledLabeledVector) + } + } + + it should "throw an exception when the pipeline operators are not compatible" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val scaler = StandardScaler() + val mlr = MultipleLinearRegression() + + val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0)) + val vectorData = env.fromCollection(vData) + val labeledVectors = List(LabeledVector(1.0, DenseVector(1.0, 2.0)), + LabeledVector(2.0, DenseVector(2.0, 3.0)), + LabeledVector(3.0, DenseVector(3.0, 4.0))) + val labeledData = env.fromCollection(labeledVectors) + val doubles = List(1.0, 2.0, 3.0) + val doubleData = env.fromCollection(doubles) + + val pipeline = scaler.chainPredictor(mlr) + + val exceptionFit = intercept[RuntimeException] { + pipeline.fit(vectorData) + } + + exceptionFit.getMessage should equal("There is no FitOperation defined for org.apache." + + "flink.ml.regression.MultipleLinearRegression which trains on a " + + "DataSet[org.apache.flink.ml.math.DenseVector]") + + // fit the pipeline so that the StandardScaler won't fail when predict is called on the pipeline + pipeline.fit(labeledData) + + // make sure that we have TransformOperation[StandardScaler, Double, Double] + implicit val standardScalerDoubleTransform = + new TransformDataSetOperation[StandardScaler, Double, Double] { + override def transformDataSet(instance: StandardScaler, transformParameters: ParameterMap, + input: DataSet[Double]): DataSet[Double] = { + input + } + } + + val exceptionPredict = intercept[RuntimeException] { + pipeline.predict(doubleData) + } + + exceptionPredict.getMessage should equal("There is no PredictOperation defined for " + + "org.apache.flink.ml.regression.MultipleLinearRegression which takes a " + + "DataSet[Double] as input.") + } + + it should "throw an exception when the input data is not supported" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val dData = List(1.0, 2.0, 3.0) + val doubleData = env.fromCollection(dData) + + val scaler = StandardScaler() + val polyFeatures = PolynomialFeatures() + + val pipeline = scaler.chainTransformer(polyFeatures) + + val exceptionFit = intercept[RuntimeException] { + pipeline.fit(doubleData) + } + + exceptionFit.getMessage should equal("There is no FitOperation defined for org.apache." + + "flink.ml.preprocessing.StandardScaler which trains on a DataSet[Double]") + + val exceptionTransform = intercept[RuntimeException] { + pipeline.transform(doubleData) + } + + exceptionTransform.getMessage should equal("There is no TransformOperation defined for " + + "org.apache.flink.ml.preprocessing.StandardScaler which takes a DataSet[Double] as input.") + } + + it should "support multiple transformers and a predictor" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val data = List(LabeledVector(1.0, DenseVector(1.0, 2.0)), + LabeledVector(2.0, DenseVector(2.0, 3.0)), + LabeledVector(3.0, DenseVector(3.0, 4.0))) + val testing = data.map(_.vector) + val evaluation = data.map(x => (x.vector, x.label)) + + val trainingData = env.fromCollection(data) + val testingData = env.fromCollection(testing) + val evaluationData = env.fromCollection(evaluation) + + val chainedScalers2 = StandardScaler().chainTransformer(StandardScaler()) + val chainedScalers3 = chainedScalers2.chainTransformer(StandardScaler()) + val chainedScalers4 = chainedScalers3.chainTransformer(StandardScaler()) + val chainedScalers5 = chainedScalers4.chainTransformer(StandardScaler()) + + val predictor = MultipleLinearRegression() + + val pipeline = chainedScalers5.chainPredictor(predictor) + + pipeline.fit(trainingData) + + val weightVector = predictor.weightsOption.get.collect().head + + weightVector.weights.valueIterator.foreach{ + _ should be (0.268050 +- 0.01) + } + + weightVector.intercept should be (0.807924 +- 0.01) + + val predictionDS = pipeline.predict(testingData) + + val predictionResult = predictionDS.collect() + + val evaluationDS = pipeline.evaluate(evaluationData) + + val evaluationResult = evaluationDS.collect() + + predictionResult.size should be(testing.size) + evaluationResult.size should be(evaluation.size) + } + + it should "throw an exception when the input data is not supported by a predictor" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val data = List(1.0, 2.0, 3.0) + val doubleData = env.fromCollection(data) + + val svm = SVM() + + intercept[RuntimeException] { + svm.fit(doubleData) + } + + intercept[RuntimeException] { + svm.predict(doubleData) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala new file mode 100644 index 0000000..75ac442 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import breeze.linalg.{max, min} +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{DenseVector, Vector} +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + + +class MinMaxScalerITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's MinMax Scaler" + + import MinMaxScalerData._ + + it should "scale the vectors' values to be restricted in the [0.0,1.0] range" in { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data) + val minMaxScaler = MinMaxScaler() + minMaxScaler.fit(dataSet) + val scaledVectors = minMaxScaler.transform(dataSet).collect + + scaledVectors.length should equal(data.length) + + //ensure data lies in the user-specified range + for (vector <- scaledVectors) { + val test = vector.asBreeze.forall(fv => { + fv >= 0.0 && fv <= 1.0 + }) + test shouldEqual true + } + + var expectedMin = data.head.asBreeze + var expectedMax = data.head.asBreeze + + for (v <- data.tail) { + val tempVector = v.asBreeze + expectedMin = min(expectedMin, tempVector) + expectedMax = max(expectedMax, tempVector) + } + + //ensure that estimated Min and Max vectors equal the expected ones + val estimatedMinMax = minMaxScaler.metricsOption.get.collect() + estimatedMinMax.head shouldEqual(expectedMin, expectedMax) + + //handle the case where a feature takes only one value + val expectedRangePerFeature = (expectedMax - expectedMin) + for (i <- 0 until expectedRangePerFeature.size) { + if (expectedRangePerFeature(i) == 0.0) { + expectedRangePerFeature(i)= 1.0 + } + } + + //ensure that vectors where scaled correctly + for (i <- 0 until data.length) { + var expectedVector = data(i).asBreeze - expectedMin + expectedVector :/= expectedRangePerFeature + expectedVector = expectedVector :* (1.0 - 0.0) + + expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs scaledVectors(i) + } + } + + it should "scale vectors' values in the [-1.0,1.0] range" in { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(labeledData) + val minMaxScaler = MinMaxScaler().setMin(-1.0) + minMaxScaler.fit(dataSet) + val scaledVectors = minMaxScaler.transform(dataSet).collect + + scaledVectors.length should equal(labeledData.length) + + //ensure data lies in the user-specified range + for (labeledVector <- scaledVectors) { + val test = labeledVector.vector.asBreeze.forall(lv => { + lv >= -1.0 && lv <= 1.0 + }) + test shouldEqual true + } + + var expectedMin = labeledData.head.vector.asBreeze + var expectedMax = labeledData.head.vector.asBreeze + + for (v <- labeledData.tail) { + val tempVector = v.vector.asBreeze + expectedMin = min(expectedMin, tempVector) + expectedMax = max(expectedMax, tempVector) + } + + //ensure that estimated Min and Max vectors equal the expected ones + val estimatedMinMax = minMaxScaler.metricsOption.get.collect() + estimatedMinMax.head shouldEqual(expectedMin, expectedMax) + + //handle the case where a feature takes only one value + val expectedRangePerFeature = (expectedMax - expectedMin) + for (i <- 0 until expectedRangePerFeature.size) { + if (expectedRangePerFeature(i) == 0.0) { + expectedRangePerFeature(i)= 1.0 + } + } + + //ensure that LabeledVectors where scaled correctly + for (i <- 0 until labeledData.length) { + var expectedVector = labeledData(i).vector.asBreeze - expectedMin + expectedVector :/= expectedRangePerFeature + expectedVector = (expectedVector :* (1.0 + 1.0)) - 1.0 + + labeledData(i).label shouldEqual scaledVectors(i).label + expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs scaledVectors(i) + .vector + } + } +} + + +object MinMaxScalerData { + + val data: Seq[Vector] = List( + DenseVector(Array(2104.00, 3.00, 0.0)), + DenseVector(Array(1600.00, 3.00, 0.0)), + DenseVector(Array(2400.00, 3.00, 0.0)), + DenseVector(Array(1416.00, 2.00, 0.0)), + DenseVector(Array(3000.00, 4.00, 0.0)), + DenseVector(Array(1985.00, 4.00, 0.0)), + DenseVector(Array(1534.00, 3.00, 0.0)), + DenseVector(Array(1427.00, 3.00, 0.0)), + DenseVector(Array(1380.00, 3.00, 0.0)), + DenseVector(Array(1494.00, 3.00, 0.0)), + DenseVector(Array(1940.00, 4.00, 0.0)), + DenseVector(Array(2000.00, 3.00, 0.0)), + DenseVector(Array(1890.00, 3.00, 0.0)), + DenseVector(Array(4478.00, 5.00, 0.0)), + DenseVector(Array(1268.00, 3.00, 0.0)), + DenseVector(Array(2300.00, 4.00, 0.0)), + DenseVector(Array(1320.00, 2.00, 0.0)), + DenseVector(Array(1236.00, 3.00, 0.0)), + DenseVector(Array(2609.00, 4.00, 0.0)), + DenseVector(Array(3031.00, 4.00, 0.0)), + DenseVector(Array(1767.00, 3.00, 0.0)), + DenseVector(Array(1888.00, 2.00, 0.0)), + DenseVector(Array(1604.00, 3.00, 0.0)), + DenseVector(Array(1962.00, 4.00, 0.0)), + DenseVector(Array(3890.00, 3.00, 0.0)), + DenseVector(Array(1100.00, 3.00, 0.0)), + DenseVector(Array(1458.00, 3.00, 0.0)), + DenseVector(Array(2526.00, 3.00, 0.0)), + DenseVector(Array(2200.00, 3.00, 0.0)), + DenseVector(Array(2637.00, 3.00, 0.0)), + DenseVector(Array(1839.00, 2.00, 0.0)), + DenseVector(Array(1000.00, 1.00, 0.0)), + DenseVector(Array(2040.00, 4.00, 0.0)), + DenseVector(Array(3137.00, 3.00, 0.0)), + DenseVector(Array(1811.00, 4.00, 0.0)), + DenseVector(Array(1437.00, 3.00, 0.0)), + DenseVector(Array(1239.00, 3.00, 0.0)), + DenseVector(Array(2132.00, 4.00, 0.0)), + DenseVector(Array(4215.00, 4.00, 0.0)), + DenseVector(Array(2162.00, 4.00, 0.0)), + DenseVector(Array(1664.00, 2.00, 0.0)), + DenseVector(Array(2238.00, 3.00, 0.0)), + DenseVector(Array(2567.00, 4.00, 0.0)), + DenseVector(Array(1200.00, 3.00, 0.0)), + DenseVector(Array(852.00, 2.00, 0.0)), + DenseVector(Array(1852.00, 4.00, 0.0)), + DenseVector(Array(1203.00, 3.00, 0.0)) + ) + + val labeledData: Seq[LabeledVector] = List( + LabeledVector(1.0, DenseVector(Array(2104.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1600.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2400.00, 3.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1416.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(3000.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1985.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1534.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1427.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1380.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1494.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1940.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2000.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1890.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(4478.00, 5.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1268.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2300.00, 4.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1320.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1236.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2609.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(3031.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1767.00, 3.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1888.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1604.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1962.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(3890.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1100.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1458.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2526.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2200.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2637.00, 3.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1839.00, 2.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1000.00, 1.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2040.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(3137.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1811.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1437.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1239.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2132.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(4215.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2162.00, 4.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1664.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2238.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2567.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1200.00, 3.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(852.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1852.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1203.00, 3.00, 0.0))) + ) +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala new file mode 100644 index 0000000..006db5f --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + +class PolynomialFeaturesITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "The polynomial base implementation" + + it should "map single element vectors to the polynomial vector space" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism (2) + + val input = Seq ( + LabeledVector (1.0, DenseVector (1)), + LabeledVector (2.0, DenseVector (2)) + ) + + val inputDS = env.fromCollection (input) + + val transformer = PolynomialFeatures() + .setDegree (3) + + val transformedDS = transformer.transform(inputDS) + + val expectedMap = List ( + (1.0 -> DenseVector (1.0, 1.0, 1.0) ), + (2.0 -> DenseVector (8.0, 4.0, 2.0) ) + ) toMap + + val result = transformedDS.collect() + + for (entry <- result) { + expectedMap.contains (entry.label) should be (true) + entry.vector should equal (expectedMap (entry.label) ) + } + } + + it should "map vectors to the polynomial vector space" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val input = Seq( + LabeledVector(1.0, DenseVector(2, 3)), + LabeledVector(2.0, DenseVector(2, 3, 4)) + ) + + val expectedMap = List( + (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)), + (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0, + 9.0, 12.0, 16.0, 2.0, 3.0, 4.0)) + ) toMap + + val inputDS = env.fromCollection(input) + + val transformer = PolynomialFeatures() + .setDegree(3) + + val transformedDS = transformer.transform(inputDS) + + val result = transformedDS.collect() + + for(entry <- result) { + expectedMap.contains(entry.label) should be(true) + entry.vector should equal(expectedMap(entry.label)) + } + } + + it should "return an empty vector if the max degree is zero" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val input = Seq( + LabeledVector(1.0, DenseVector(2, 3)), + LabeledVector(2.0, DenseVector(2, 3, 4)) + ) + + val inputDS = env.fromCollection(input) + + val transformer = PolynomialFeatures() + .setDegree(0) + + val transformedDS = transformer.transform(inputDS) + + val result = transformedDS.collect() + + val expectedMap = List( + (1.0 -> DenseVector()), + (2.0 -> DenseVector()) + ) toMap + + for(entry <- result) { + expectedMap.contains(entry.label) should be(true) + entry.vector should equal(expectedMap(entry.label)) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala new file mode 100644 index 0000000..5cd253d --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import breeze.linalg +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.test.util.FlinkTestBase +import org.apache.flink.ml.math.Breeze._ +import org.scalatest._ + + +class StandardScalerITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's Standard Scaler" + + import StandardScalerData._ + + def checkVectors( + scaledVectors: Seq[FlinkVector], + expectedMean: Double, + expectedStd: Double): Unit = { + scaledVectors.length should equal(data.length) + + val numberOfFeatures = scaledVectors(0).size + var scaledMean: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures) + var scaledStd: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures) + + for (vector <- scaledVectors) { + scaledMean += vector.asBreeze + } + scaledMean /= scaledVectors.size.asInstanceOf[Double] + + for (vector <- scaledVectors) { + val temp = vector.asBreeze - scaledMean + scaledStd += temp :* temp + } + scaledStd /= scaledVectors.size.asInstanceOf[Double] + scaledStd = sqrt(scaledStd) + + for (i <- 0 until numberOfFeatures) { + scaledMean(i) should be(expectedMean +- 1e-9) + scaledStd(i) should be(expectedStd +- 1e-9) + } + } + + it should "scale the vectors to have mean equal to 0 and std equal to 1" in { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).collect() + + checkVectors(scaledVectors, 0.0, 1.0) + } + + it should "scale the vectors to have mean equal to 10 and standard deviation equal to 2" in { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data) + val scaler = StandardScaler().setMean(10.0).setStd(2.0) + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).collect() + + checkVectors(scaledVectors, 10.0, 2.0) + } + + it should "work with LabeledVector" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data).map(v => LabeledVector(1.0, v)) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).map(lv => lv.vector).collect() + + checkVectors(scaledVectors, 0.0, 1.0) + } + + it should "work with (FlinkVector, Double) tuples" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data).map(v => (v, 1.0)) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).map(vl => vl._1).collect() + + checkVectors(scaledVectors, 0.0, 1.0) + } +} + +object StandardScalerData { + + val data: Seq[FlinkVector] = List( + DenseVector(Array(2104.00, 3.00)), + DenseVector(Array(1600.00, 3.00)), + DenseVector(Array(2400.00, 3.00)), + DenseVector(Array(1416.00, 2.00)), + DenseVector(Array(3000.00, 4.00)), + DenseVector(Array(1985.00, 4.00)), + DenseVector(Array(1534.00, 3.00)), + DenseVector(Array(1427.00, 3.00)), + DenseVector(Array(1380.00, 3.00)), + DenseVector(Array(1494.00, 3.00)), + DenseVector(Array(1940.00, 4.00)), + DenseVector(Array(2000.00, 3.00)), + DenseVector(Array(1890.00, 3.00)), + DenseVector(Array(4478.00, 5.00)), + DenseVector(Array(1268.00, 3.00)), + DenseVector(Array(2300.00, 4.00)), + DenseVector(Array(1320.00, 2.00)), + DenseVector(Array(1236.00, 3.00)), + DenseVector(Array(2609.00, 4.00)), + DenseVector(Array(3031.00, 4.00)), + DenseVector(Array(1767.00, 3.00)), + DenseVector(Array(1888.00, 2.00)), + DenseVector(Array(1604.00, 3.00)), + DenseVector(Array(1962.00, 4.00)), + DenseVector(Array(3890.00, 3.00)), + DenseVector(Array(1100.00, 3.00)), + DenseVector(Array(1458.00, 3.00)), + DenseVector(Array(2526.00, 3.00)), + DenseVector(Array(2200.00, 3.00)), + DenseVector(Array(2637.00, 3.00)), + DenseVector(Array(1839.00, 2.00)), + DenseVector(Array(1000.00, 1.00)), + DenseVector(Array(2040.00, 4.00)), + DenseVector(Array(3137.00, 3.00)), + DenseVector(Array(1811.00, 4.00)), + DenseVector(Array(1437.00, 3.00)), + DenseVector(Array(1239.00, 3.00)), + DenseVector(Array(2132.00, 4.00)), + DenseVector(Array(4215.00, 4.00)), + DenseVector(Array(2162.00, 4.00)), + DenseVector(Array(1664.00, 2.00)), + DenseVector(Array(2238.00, 3.00)), + DenseVector(Array(2567.00, 4.00)), + DenseVector(Array(1200.00, 3.00)), + DenseVector(Array(852.00, 2.00)), + DenseVector(Array(1852.00, 4.00)), + DenseVector(Array(1203.00, 3.00)) + ) +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala new file mode 100644 index 0000000..9c241fd --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation + +import org.scalatest._ + +import scala.language.postfixOps + +import org.apache.flink.api.scala._ +import org.apache.flink.test.util.FlinkTestBase + +class ALSITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + override val parallelism = 2 + + behavior of "The alternating least squares (ALS) implementation" + + it should "properly factorize a matrix" in { + import Recommendation._ + + val env = ExecutionEnvironment.getExecutionEnvironment + + val als = ALS() + .setIterations(iterations) + .setLambda(lambda) + .setBlocks(4) + .setNumFactors(numFactors) + + val inputDS = env.fromCollection(data) + + als.fit(inputDS) + + val testData = env.fromCollection(expectedResult.map{ + case (userID, itemID, rating) => (userID, itemID) + }) + + val predictions = als.predict(testData).collect() + + predictions.length should equal(expectedResult.length) + + val resultMap = expectedResult map { + case (uID, iID, value) => (uID, iID) -> value + } toMap + + predictions foreach { + case (uID, iID, value) => { + resultMap.isDefinedAt((uID, iID)) should be(true) + + value should be(resultMap((uID, iID)) +- 0.1) + } + } + + val risk = als.empiricalRisk(inputDS).collect().head + + risk should be(expectedEmpiricalRisk +- 1) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala new file mode 100644 index 0000000..8d8e4b9 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation + +object Recommendation { + val iterations = 9 + val lambda = 1.0 + val numFactors = 5 + + val data: Seq[(Int, Int, Double)] = { + Seq( + (2,13,534.3937734561154), + (6,14,509.63176469621936), + (4,14,515.8246770897443), + (7,3,495.05234565105), + (2,3,532.3281786219485), + (5,3,497.1906356844367), + (3,3,512.0640508585093), + (10,3,500.2906742233019), + (1,4,521.9189079662882), + (2,4,515.0734651491396), + (1,7,522.7532725967008), + (8,4,492.65683825096403), + (4,8,492.65683825096403), + (10,8,507.03319667905413), + (7,1,522.7532725967008), + (1,1,572.2230209271174), + (2,1,563.5849190220224), + (6,1,518.4844061038742), + (9,1,529.2443732217674), + (8,1,543.3202505434103), + (7,2,516.0188923307859), + (1,2,563.5849190220224), + (1,11,515.1023793011227), + (8,2,536.8571133978352), + (2,11,507.90776961762225), + (3,2,532.3281786219485), + (5,11,476.24185144363304), + (4,2,515.0734651491396), + (4,11,469.92049343738233), + (3,12,509.4713776280098), + (4,12,494.6533165132021), + (7,5,482.2907867916308), + (6,5,477.5940040923741), + (4,5,480.9040684364228), + (1,6,518.4844061038742), + (6,6,470.6605085832807), + (8,6,489.6360564705307), + (4,6,472.74052954447046), + (7,9,482.5837650471611), + (5,9,487.00175463269863), + (9,9,500.69514584780944), + (4,9,477.71644808419325), + (7,10,485.3852917539852), + (8,10,507.03319667905413), + (3,10,500.2906742233019), + (5,15,488.08215944254437), + (6,15,480.16929757607346) + ) + } + + val expectedResult: Seq[(Int, Int, Double)] = { + Seq( + (2, 2, 526.1037), + (5, 9, 468.5680), + (10, 3, 484.8975), + (5, 13, 451.6228), + (1, 15, 493.4956), + (4, 11, 456.3862) + ) + } + + val expectedEmpiricalRisk = 505374.1877 +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala new file mode 100644 index 0000000..17b8a85 --- /dev/null +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.regression + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{ParameterMap, WeightVector} +import org.apache.flink.ml.preprocessing.PolynomialFeatures +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + +class MultipleLinearRegressionITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "The multipe linear regression implementation" + + it should "estimate a linear function" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val mlr = MultipleLinearRegression() + + import RegressionData._ + + val parameters = ParameterMap() + + parameters.add(MultipleLinearRegression.Stepsize, 2.0) + parameters.add(MultipleLinearRegression.Iterations, 10) + parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + + val inputDS = env.fromCollection(data) + mlr.fit(inputDS, parameters) + + val weightList = mlr.weightsOption.get.collect() + + weightList.size should equal(1) + + val WeightVector(weights, intercept) = weightList.head + + expectedWeights.toIterator zip weights.valueIterator foreach { + case (expectedWeight, weight) => + weight should be (expectedWeight +- 1) + } + intercept should be (expectedWeight0 +- 0.4) + + val srs = mlr.squaredResidualSum(inputDS).collect().head + + srs should be (expectedSquaredResidualSum +- 2) + } + + it should "estimate a cubic function" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val polynomialBase = PolynomialFeatures() + val mlr = MultipleLinearRegression() + + val pipeline = polynomialBase.chainPredictor(mlr) + + val inputDS = env.fromCollection(RegressionData.polynomialData) + + val parameters = ParameterMap() + .add(PolynomialFeatures.Degree, 3) + .add(MultipleLinearRegression.Stepsize, 0.004) + .add(MultipleLinearRegression.Iterations, 100) + + pipeline.fit(inputDS, parameters) + + val weightList = mlr.weightsOption.get.collect() + + weightList.size should equal(1) + + val WeightVector(weights, intercept) = weightList.head + + RegressionData.expectedPolynomialWeights.toIterator.zip(weights.valueIterator) foreach { + case (expectedWeight, weight) => + weight should be(expectedWeight +- 0.1) + } + + intercept should be(RegressionData.expectedPolynomialWeight0 +- 0.1) + + val transformedInput = polynomialBase.transform(inputDS, parameters) + + val srs = mlr.squaredResidualSum(transformedInput).collect().head + + srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5) + } + + it should "make (mostly) correct predictions" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val mlr = MultipleLinearRegression() + + import RegressionData._ + + val parameters = ParameterMap() + + parameters.add(MultipleLinearRegression.Stepsize, 1.0) + parameters.add(MultipleLinearRegression.Iterations, 10) + parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + + val inputDS = env.fromCollection(data) + val evaluationDS = inputDS.map(x => (x.vector, x.label)) + + mlr.fit(inputDS, parameters) + + val predictionPairs = mlr.evaluate(evaluationDS) + + val absoluteErrorSum = predictionPairs.collect().map{ + case (truth, prediction) => Math.abs(truth - prediction)}.sum + + absoluteErrorSum should be < 50.0 + } +}