[GitHub] flink pull request: Ml branch
GitHub user fobeligi opened a pull request: https://github.com/apache/flink/pull/579 Ml branch Implementation of StandardScaler and respective tests for FLINK-1809 JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fobeligi/incubator-flink ml-branch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/579.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #579 commit 96cb2f5676e945d7bc414987934e5c854de70584 Author: fobeligi Date: 2015-04-01T20:31:38Z [FLINK-1809] Add Preprocessing package and Standardizer to ML-library commit 2e8333b74e08f0c48bb58d36f2915a9ad832c456 Author: fobeligi Date: 2015-04-03T16:52:35Z [FLINK-1809] Change implementation to use Breeze.linalg library and add tests for Standardizer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973465 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + --- End diff -- one line break is sufficient --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973555 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { --- End diff -- Should be a ITSuite, since it starts a whole Flink cluster to execute the job. Thus, name it ```StandardizerITSuite```. That way, it will only be executed in maven verify phase. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973639 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { --- End diff -- ML ITSuites should mixin the trait ```FlinkTestBase```. This trait makes sure that the right ```ExecutionEnvironment``` is set for the test case execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973680 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" --- End diff -- line break --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973695 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ --- End diff -- line break --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973786 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) --- End diff -- This will be done by the ```FlinkTestBase``` for you. Only if you have to set it to a specific value, then override the inherited ```parallelism``` value from ```FlinkTestBase```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973815 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer() +val scaledVectors = transformer.transform(dataSet).collect + + --- End diff -- two line breaks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973879 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer() +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(centeredExpectedVectors.length) + +scaledVectors zip centeredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) --- End diff -- ```apply``` method can be replaced by ```scaledVector(i)``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973910 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer() +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(centeredExpectedVectors.length) + +scaledVectors zip centeredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } + + it should "properly scale the given vectors without centering them first" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) --- End diff -- Again explicit parallelism. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973935 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer() +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(centeredExpectedVectors.length) + +scaledVectors zip centeredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } + + it should "properly scale the given vectors without centering them first" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer().setScaleMean(false) +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(nonCenteredExpectedVectors.length) + +scaledVectors zip nonCenteredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) --- End diff -- Same with ```apply``` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973958 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer() +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(centeredExpectedVectors.length) + +scaledVectors zip centeredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } + + it should "properly scale the given vectors without centering them first" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer().setScaleMean(false) +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(nonCenteredExpectedVectors.length) + +scaledVectors zip nonCenteredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } + + it should "properly center the given vectors without scaling them to unit variance" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) --- End diff -- Explicit parallelism. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27974019 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer() +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(centeredExpectedVectors.length) + +scaledVectors zip centeredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } + + it should "properly scale the given vectors without centering them first" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer().setScaleMean(false) +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(nonCenteredExpectedVectors.length) + +scaledVectors zip nonCenteredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } + + it should "properly center the given vectors without scaling them to unit variance" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer().setScaleStd(false) +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(nonScaledExpectedVectors.length) + +scaledVectors zip nonScaledExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } +} + +object StandardizerData { + + val data: Seq[Vector] = List(DenseVector(Array(2104.0, 1600.0, 2400.0, 1416.0, 3000.0, 1985.0, +1534.0, 1427.0, 1380.0, 1494.0, 1940.0, 2000.0, 1890.0, 4478.0, 1268.0, 2300.0, 1320.0, +1236.0, 2609.0, 3031.0, 1767.0, 1888.0, 1604.0, 1962.0, 3890.0, 1100.0, 1458.0, 2526.0, +2200.0, 2637.0, 1839.0, 1000.0, 2040.0, 3137.0, 1811.0, 1437.0, 1239.0, 2132.0, 4215.0, +2162.0, 1664.0, 2238.0, 2567.0, 1200.0, 852.0, 1852.0, 1203.0)), +DenseVector(Array(3., 3., 3., 2., 4., 4., 3., 3., 3., + 3., 4., 3., 3., 5., 3., 4., 2., 3., 4., 4., + 3., 2., 3., 4., 3., 3., 3., 3., 3., 3., 2., + 1., 4., 3., 4., 3., 3., 4., 4., 4., 2., 3., + 4., 3., 2., 4., 3.))) + + --- End diff -- two line breaks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27973978 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocess + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.scalatest._ + + +class StandardizerSuite extends FlatSpec with Matchers { + behavior of "Flink's Standard Scaler" + import StandardizerData._ + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer() +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(centeredExpectedVectors.length) + +scaledVectors zip centeredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } + + it should "properly scale the given vectors without centering them first" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer().setScaleMean(false) +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(nonCenteredExpectedVectors.length) + +scaledVectors zip nonCenteredExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) +} + } +} + } + + it should "properly center the given vectors without scaling them to unit variance" in { + +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) + +val dataSet = env.fromCollection(data) +val transformer = new Standardizer().setScaleStd(false) +val scaledVectors = transformer.transform(dataSet).collect + + +scaledVectors.length should equal(nonScaledExpectedVectors.length) + +scaledVectors zip nonScaledExpectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector.apply(i) should be(expectedVector.apply(i) +- (0.01)) --- End diff -- ```apply``` method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27974561 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocess + +import breeze.linalg._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Vector} +import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd} + + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has mean zero and standard deviation equal to one. + */ --- End diff -- Example code could be helpful here. Also a parameter description would be really helpful. See the ScalaDocs of the other algorithms. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27974882 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocess + +import breeze.linalg._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Vector} +import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd} + + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has mean zero and standard deviation equal to one. + */ +class Standardizer extends Transformer[Vector, Vector] with Serializable { --- End diff -- I would probably go with scikit-learn's name "StandardScaler". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27974938 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocess + +import breeze.linalg._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Vector} +import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd} + + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has mean zero and standard deviation equal to one. + */ +class Standardizer extends Transformer[Vector, Vector] with Serializable { + + def setScaleMean(wm: Boolean): Standardizer = { --- End diff -- mean as parameter name should be enough --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27974957 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocess + +import breeze.linalg._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Vector} +import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd} + + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has mean zero and standard deviation equal to one. + */ +class Standardizer extends Transformer[Vector, Vector] with Serializable { + + def setScaleMean(wm: Boolean): Standardizer = { +parameters.add(ScaleMean, wm) +this + } + + def setScaleStd(std: Boolean): Standardizer = { --- End diff -- std as parameter name should be enough --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27975352 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocess + +import breeze.linalg._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Vector} +import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd} + + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has mean zero and standard deviation equal to one. + */ +class Standardizer extends Transformer[Vector, Vector] with Serializable { + + def setScaleMean(wm: Boolean): Standardizer = { +parameters.add(ScaleMean, wm) +this + } + + def setScaleStd(std: Boolean): Standardizer = { +parameters.add(ScaleStd, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val sMean = resultingParameters(ScaleMean) +val sStd = resultingParameters(ScaleStd) + +input.map { --- End diff -- A map operation won't be enough to calculate the mean and the standard deviation of a ```DataSet``` of ```Vector```. Each vector represents one datapoint consisting of several features. In order to calculate the mean, for example, for every feature, you have to sum up the vectors and divide by the number of vectors. This gives you a vector of feature means. You can do this with a reduce operation. Having the mean and std, you can then broadcast this value to a ```RichMapFunction``` and scale the vectors accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r27975475 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocess + +import breeze.linalg._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{Vector} +import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd} + + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has mean zero and standard deviation equal to one. + */ +class Standardizer extends Transformer[Vector, Vector] with Serializable { + + def setScaleMean(wm: Boolean): Standardizer = { +parameters.add(ScaleMean, wm) +this + } + + def setScaleStd(std: Boolean): Standardizer = { +parameters.add(ScaleStd, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val sMean = resultingParameters(ScaleMean) +val sStd = resultingParameters(ScaleStd) + +input.map { + vector => { +scaleVector(vector, sMean, sStd) + } +} + } + + /** Scales the vector to zero mean and unit variance +* +* @param vector +* @param sMean +* @param sStd +*/ + private def scaleVector(vector: Vector, sMean: Boolean, sStd: Boolean): Vector = { --- End diff -- Maybe we can generalize the StandardScaler by allowing the user to define a different std than 1 and a different mean than 0. This could then be the parameter value. The default value would then be ```(0, 1)```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/579#issuecomment-90936379 Hi Faye, I made some annotations. Ping me when you've addressed them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/579#issuecomment-91286399 Maybe we could name the package ```org.apache.flink.ml.preprocessing``` instead of ```org.apache.flink.ml.preprocess```. What do you think? If you include in the title of the PR the JIRA issue, like "[FLINK-1809] Adds standard scaler to ml library", then all our comments here will be automatically added to the corresponding JIRA issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412028 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one --- End diff -- Mean and standard deviation are configurable, right? Default is (0,1)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412749 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)]("broadcastedMetrics").get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs & Cramer algorithm was used +* +* @param dataSet The data set for which we want to calculate mean and variance +* @return DataSet of Tuple2 +*/ + private def extractFeatureMetrics(dataSet: DataSet[Vector]) = { --- End diff -- By adding explicitly the return type of Scala methods, here
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412374 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)]("broadcastedMetrics").get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze --- End diff -- By storing the ```broadcastMeanSet``` and ```broadcastStdSet``` directly as ```BreezeVector``` we can avoid the repetitive wrapping of ```Vector``` into a ```BreezeVector``` instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412288 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null --- End diff -- Maybe ```broadcastMean``` and ```broadcastStd``` are better names, because these variables are no sets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412920 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)]("broadcastedMetrics").get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs & Cramer algorithm was used +* +* @param dataSet The data set for which we want to calculate mean and variance +* @return DataSet of Tuple2 --- End diff -- That's Java notation. We could write: A data set containing a single tuple of two vectors ```(meanVector, stdVector)```. The first vector repr
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412479 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)]("broadcastedMetrics").get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, --- End diff -- typo: pass --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA.
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28412522 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)]("broadcastedMetrics").get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs & Cramer algorithm was used --- End diff -- Maybe we can put the link http://www.cs.yale.edu/publications/techreports/tr222.pdf here, too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or i
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28580607 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest._ + +class StandardScalerITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's Standard Scaler" + + import StandardScalerData._ + + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) +val transformer = new StandardScaler() +val scaledVectors = transformer.transform(dataSet).collect --- End diff -- Calculating the mean and std manually and checking against (0, 1) could be helpful as well. The thing is that nobody knows how the vectors look like after the transformation and thus can only rely on the correctness of the expected vectors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28580779 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)]("broadcastedMetrics").get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs & Cramer algorithm was used +* +* @param dataSet The data set for which we want to calculate mean and variance +* @return DataSet of Tuple2 +*/ + private def extractFeatureMetrics(dataSet: DataSet[Vector]) = { + +val metrics = dataSet.combineGroup(new GroupCombineFunction[Vector, (Doub
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28580943 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)]("broadcastedMetrics").get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs & Cramer algorithm was used +* +* @param dataSet The data set for which we want to calculate mean and variance +* @return DataSet of Tuple2 +*/ + private def extractFeatureMetrics(dataSet: DataSet[Vector]) = { + +val metrics = dataSet.combineGroup(new GroupCombineFunction[Vector, (Doub
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28580329 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import java.lang.Iterable +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import org.apache.flink.util.Collector + +/** Scales observations, so that all features have mean equal to zero + * and standard deviation equal to one + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + * val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + * transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + def setStd(std: Double): StandardScaler = { +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMeanSet: Vector = null + var broadcastStdSet: Vector = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(Vector, + Vector)]("broadcastedMetrics").get(0) +broadcastMeanSet = broadcastedMetrics._1 +broadcastStdSet = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMeanSet.asBreeze +myVector :/= broadcastStdSet.asBreeze +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pas over the data, +* the Youngs & Cramer algorithm was used +* +* @param dataSet The data set for which we want to calculate mean and variance +* @return DataSet of Tuple2 +*/ + private def extractFeatureMetrics(dataSet: DataSet[Vector]) = { + +val metrics = dataSet.combineGroup(new GroupCombineFunction[Vector, (Doub
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28580480 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.{Vector, DenseVector} +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest._ + +class StandardScalerITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's Standard Scaler" + + import StandardScalerData._ + + it should "first center and then properly scale the given vectors" in { + +val env = ExecutionEnvironment.getExecutionEnvironment + +val dataSet = env.fromCollection(data) +val transformer = new StandardScaler() +val scaledVectors = transformer.transform(dataSet).collect + +scaledVectors.length should equal(expectedVectors.length) + +scaledVectors zip expectedVectors foreach { + case (scaledVector, expectedVector) => { +for (i <- 0 until scaledVector.size) { + scaledVector(i) should be(expectedVector(i) +- (0.01)) +} + } +} + } +} --- End diff -- Maybe a test case with a different mean and std could be helpful to test that the parameters are correctly passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/579#issuecomment-93954174 Hi Faye, the code looks much better now. I had only some minor comments. What would be really awesome is to create some documentation for the website. You can look at flink/docs/ml/ to see how the other components are documented. It's basically the scala doc of the class. Another thing to think about is the way we use Breeze operators. At the moment you have specified all operators with a preceding colon. This is perfectly fine but I personally use it only where it's necessary. For example, if you multiply a vector with a double you can also write ```vector * double```. Of course this operation is also an element wise operation but IMHO I think it's easier to parse a formula if you use the colon syntax only if both operands are vectors. That way, another person can directly spot that this operation is a element-wise vector vector multiplication. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28605205 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} + +/** Scales observations, so that all features have a user-specified mean and standard deviation. + * By default for StandardScaler transformer mean=0.0 and std=1.0. + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + *val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + *val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + *transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + /** + * + * @param std the user-specified std value. In case the user gives 0.0 value as input, + *the std is set to the default value: 1.0. + * @return the StandardScaler object with its std value set to the user-specified value + */ + def setStd(std: Double): StandardScaler = { +if (std == 0.0) { + return this +} +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMean: linalg.Vector[Double] = null + var broadcastStd: linalg.Vector[Double] = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double], + linalg.Vector[Double])]("broadcastedMetrics").get(0) +broadcastMean = broadcastedMetrics._1 +broadcastStd = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMean +myVector :/= broadcastStd +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pass over the data, +* the Youngs &
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28605215 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} + +/** Scales observations, so that all features have a user-specified mean and standard deviation. + * By default for StandardScaler transformer mean=0.0 and std=1.0. + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + *val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + *val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + *transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + /** + * + * @param std the user-specified std value. In case the user gives 0.0 value as input, + *the std is set to the default value: 1.0. + * @return the StandardScaler object with its std value set to the user-specified value + */ + def setStd(std: Double): StandardScaler = { +if (std == 0.0) { + return this +} +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMean: linalg.Vector[Double] = null + var broadcastStd: linalg.Vector[Double] = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double], + linalg.Vector[Double])]("broadcastedMetrics").get(0) +broadcastMean = broadcastedMetrics._1 +broadcastStd = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMean +myVector :/= broadcastStd +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pass over the data, +* the Youngs &
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/579#discussion_r28605175 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import breeze.linalg +import breeze.linalg.DenseVector +import breeze.numerics.sqrt +import breeze.numerics.sqrt._ +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} + +/** Scales observations, so that all features have a user-specified mean and standard deviation. + * By default for StandardScaler transformer mean=0.0 and std=1.0. + * + * This transformer takes a a Vector of values and maps it into the + * scaled Vector that each feature has a user-specified mean and standard deviation. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of + * [[Vector]]. + * + * @example + * {{{ + *val trainingDS: DataSet[Vector] = env.fromCollection(data) + * + *val transformer = StandardScaler().setMean(10.0).setStd(2.0) + * + *transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * equal to 1 + */ +class StandardScaler extends Transformer[Vector, Vector] with Serializable { + + def setMean(mu: Double): StandardScaler = { +parameters.add(Mean, mu) +this + } + + /** + * + * @param std the user-specified std value. In case the user gives 0.0 value as input, + *the std is set to the default value: 1.0. + * @return the StandardScaler object with its std value set to the user-specified value + */ + def setStd(std: Double): StandardScaler = { +if (std == 0.0) { + return this +} +parameters.add(Std, std) +this + } + + override def transform(input: DataSet[Vector], parameters: ParameterMap): + DataSet[Vector] = { +val resultingParameters = this.parameters ++ parameters +val mean = resultingParameters(Mean) +val std = resultingParameters(Std) + +val featureMetrics = extractFeatureMetrics(input) + +input.map(new RichMapFunction[Vector, Vector]() { + + var broadcastMean: linalg.Vector[Double] = null + var broadcastStd: linalg.Vector[Double] = null + + override def open(parameters: Configuration): Unit = { +val broadcastedMetrics = getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double], + linalg.Vector[Double])]("broadcastedMetrics").get(0) +broadcastMean = broadcastedMetrics._1 +broadcastStd = broadcastedMetrics._2 + } + + override def map(vector: Vector): Vector = { +var myVector = vector.asBreeze + +myVector :-= broadcastMean +myVector :/= broadcastStd +myVector = (myVector :* std) :+ mean +return myVector.fromBreeze + } +}).withBroadcastSet(featureMetrics, "broadcastedMetrics") + } + + /** Calculates in one pass over the data the features' mean and standard deviation. +* For the calculation of the Standard deviation with one pass over the data, +* the Youngs &
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/579#issuecomment-94012543 Three minor comments. Otherwise it looks really good :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/579#issuecomment-94698596 Great work Faye. Will merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Ml branch
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/579 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---