[GitHub] flink pull request: Ml branch

2015-04-08 Thread fobeligi
GitHub user fobeligi opened a pull request:

https://github.com/apache/flink/pull/579

Ml branch

Implementation of StandardScaler and respective tests for FLINK-1809 JIRA.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fobeligi/incubator-flink ml-branch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #579


commit 96cb2f5676e945d7bc414987934e5c854de70584
Author: fobeligi 
Date:   2015-04-01T20:31:38Z

[FLINK-1809] Add Preprocessing package and Standardizer to ML-library

commit 2e8333b74e08f0c48bb58d36f2915a9ad832c456
Author: fobeligi 
Date:   2015-04-03T16:52:35Z

[FLINK-1809] Change implementation to use Breeze.linalg library and add 
tests for Standardizer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973465
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
--- End diff --

one line break is sufficient


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973555
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
--- End diff --

Should be a ITSuite, since it starts a whole Flink cluster to execute the 
job. Thus, name it ```StandardizerITSuite```. That way, it will only be 
executed in maven verify phase. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973639
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
--- End diff --

ML ITSuites should mixin the trait ```FlinkTestBase```. This trait makes 
sure that the right ```ExecutionEnvironment``` is set for the test case 
execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973680
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
--- End diff --

line break


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973695
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
--- End diff --

line break


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973786
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
--- End diff --

This will be done by the ```FlinkTestBase``` for you. Only if you have to 
set it to a specific value, then override the inherited ```parallelism``` value 
from ```FlinkTestBase```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973815
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer()
+val scaledVectors = transformer.transform(dataSet).collect
+
+
--- End diff --

two line breaks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973879
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer()
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(centeredExpectedVectors.length)
+
+scaledVectors zip centeredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
--- End diff --

```apply``` method can be replaced by ```scaledVector(i)```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973910
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer()
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(centeredExpectedVectors.length)
+
+scaledVectors zip centeredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+
+  it should "properly scale the given vectors without centering them 
first" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
--- End diff --

Again explicit parallelism.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973935
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer()
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(centeredExpectedVectors.length)
+
+scaledVectors zip centeredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+
+  it should "properly scale the given vectors without centering them 
first" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer().setScaleMean(false)
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(nonCenteredExpectedVectors.length)
+
+scaledVectors zip nonCenteredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
--- End diff --

Same with ```apply``` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973958
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer()
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(centeredExpectedVectors.length)
+
+scaledVectors zip centeredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+
+  it should "properly scale the given vectors without centering them 
first" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer().setScaleMean(false)
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(nonCenteredExpectedVectors.length)
+
+scaledVectors zip nonCenteredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+
+  it should "properly center the given vectors without scaling them to 
unit variance" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
--- End diff --

Explicit parallelism.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27974019
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer()
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(centeredExpectedVectors.length)
+
+scaledVectors zip centeredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+
+  it should "properly scale the given vectors without centering them 
first" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer().setScaleMean(false)
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(nonCenteredExpectedVectors.length)
+
+scaledVectors zip nonCenteredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+
+  it should "properly center the given vectors without scaling them to 
unit variance" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer().setScaleStd(false)
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(nonScaledExpectedVectors.length)
+
+scaledVectors zip nonScaledExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+}
+
+object StandardizerData {
+
+  val data: Seq[Vector] = List(DenseVector(Array(2104.0, 1600.0, 2400.0, 
1416.0, 3000.0, 1985.0,
+1534.0, 1427.0, 1380.0, 1494.0, 1940.0, 2000.0, 1890.0, 4478.0, 
1268.0, 2300.0, 1320.0,
+1236.0, 2609.0, 3031.0, 1767.0, 1888.0, 1604.0, 1962.0, 3890.0, 
1100.0, 1458.0, 2526.0,
+2200.0, 2637.0, 1839.0, 1000.0, 2040.0, 3137.0, 1811.0, 1437.0, 
1239.0, 2132.0, 4215.0,
+2162.0, 1664.0, 2238.0, 2567.0, 1200.0, 852.0, 1852.0, 1203.0)),
+DenseVector(Array(3., 3., 3., 2., 4., 4., 
3., 3., 3.,
+  3., 4., 3., 3., 5., 3., 4., 2., 
3., 4., 4.,
+  3., 2., 3., 4., 3., 3., 3., 3., 
3., 3., 2.,
+  1., 4., 3., 4., 3., 3., 4., 4., 
4., 2., 3.,
+  4., 3., 2., 4., 3.)))
+
+
--- End diff --

two line breaks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and

[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27973978
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocess/StandardizerSuite.scala
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocess
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.scalatest._
+
+
+class StandardizerSuite extends FlatSpec with Matchers {
+  behavior of "Flink's Standard Scaler"
+  import StandardizerData._
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer()
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(centeredExpectedVectors.length)
+
+scaledVectors zip centeredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+
+  it should "properly scale the given vectors without centering them 
first" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer().setScaleMean(false)
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(nonCenteredExpectedVectors.length)
+
+scaledVectors zip nonCenteredExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
+}
+  }
+}
+  }
+
+  it should "properly center the given vectors without scaling them to 
unit variance" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+
+val dataSet = env.fromCollection(data)
+val transformer = new Standardizer().setScaleStd(false)
+val scaledVectors = transformer.transform(dataSet).collect
+
+
+scaledVectors.length should equal(nonScaledExpectedVectors.length)
+
+scaledVectors zip nonScaledExpectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector.apply(i) should be(expectedVector.apply(i) +- 
(0.01))
--- End diff --

```apply``` method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27974561
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocess
+
+import breeze.linalg._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Vector}
+import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd}
+
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has mean zero and standard deviation 
equal to one.
+  */
--- End diff --

Example code could be helpful here. Also a parameter description would be 
really helpful. See the ScalaDocs of the other algorithms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27974882
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocess
+
+import breeze.linalg._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Vector}
+import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd}
+
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has mean zero and standard deviation 
equal to one.
+  */
+class Standardizer extends Transformer[Vector, Vector] with Serializable {
--- End diff --

I would probably go with scikit-learn's name "StandardScaler".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27974938
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocess
+
+import breeze.linalg._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Vector}
+import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd}
+
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has mean zero and standard deviation 
equal to one.
+  */
+class Standardizer extends Transformer[Vector, Vector] with Serializable {
+
+  def setScaleMean(wm: Boolean): Standardizer = {
--- End diff --

mean as parameter name should be enough


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27974957
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocess
+
+import breeze.linalg._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Vector}
+import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd}
+
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has mean zero and standard deviation 
equal to one.
+  */
+class Standardizer extends Transformer[Vector, Vector] with Serializable {
+
+  def setScaleMean(wm: Boolean): Standardizer = {
+parameters.add(ScaleMean, wm)
+this
+  }
+
+  def setScaleStd(std: Boolean): Standardizer = {
--- End diff --

std as parameter name should be enough


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27975352
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocess
+
+import breeze.linalg._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Vector}
+import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd}
+
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has mean zero and standard deviation 
equal to one.
+  */
+class Standardizer extends Transformer[Vector, Vector] with Serializable {
+
+  def setScaleMean(wm: Boolean): Standardizer = {
+parameters.add(ScaleMean, wm)
+this
+  }
+
+  def setScaleStd(std: Boolean): Standardizer = {
+parameters.add(ScaleStd, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val sMean = resultingParameters(ScaleMean)
+val sStd = resultingParameters(ScaleStd)
+
+input.map {
--- End diff --

A map operation won't be enough to calculate the mean and the standard 
deviation of a ```DataSet``` of ```Vector```. Each vector represents one 
datapoint consisting of several features. In order to calculate the mean, for 
example, for every feature, you have to sum up the vectors and divide by the 
number of vectors. This gives you a vector of feature means. You can do this 
with a reduce operation. Having the mean and std, you can then broadcast this 
value to a ```RichMapFunction``` and scale the vectors accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r27975475
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocess/Standardizer.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocess
+
+import breeze.linalg._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{Vector}
+import org.apache.flink.ml.preprocess.Standardizer.{ScaleMean, ScaleStd}
+
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has mean zero and standard deviation 
equal to one.
+  */
+class Standardizer extends Transformer[Vector, Vector] with Serializable {
+
+  def setScaleMean(wm: Boolean): Standardizer = {
+parameters.add(ScaleMean, wm)
+this
+  }
+
+  def setScaleStd(std: Boolean): Standardizer = {
+parameters.add(ScaleStd, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val sMean = resultingParameters(ScaleMean)
+val sStd = resultingParameters(ScaleStd)
+
+input.map {
+  vector => {
+scaleVector(vector, sMean, sStd)
+  }
+}
+  }
+
+  /** Scales the vector to zero mean and unit variance
+*
+* @param vector
+* @param sMean
+* @param sStd
+*/
+  private def scaleVector(vector: Vector, sMean: Boolean, sStd: Boolean): 
Vector = {
--- End diff --

Maybe we can generalize the StandardScaler by allowing the user to define a 
different std than 1 and a different mean than 0. This could then be the 
parameter value. The default value would then be ```(0, 1)```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-08 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/579#issuecomment-90936379
  
Hi Faye, I made some annotations. Ping me when you've addressed them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-09 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/579#issuecomment-91286399
  
Maybe we could name the package ```org.apache.flink.ml.preprocessing``` 
instead of ```org.apache.flink.ml.preprocess```. What do you think?

If you include in the title of the PR the JIRA issue, like "[FLINK-1809] 
Adds standard scaler to ml library", then all our comments here will be 
automatically added to the corresponding JIRA issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412028
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
--- End diff --

Mean and standard deviation are configurable, right? Default is (0,1)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412749
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)]("broadcastedMetrics").get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs & Cramer algorithm was used
+*
+* @param dataSet The data set for which we want to calculate mean and 
variance
+* @return DataSet of Tuple2
+*/
+  private def extractFeatureMetrics(dataSet: DataSet[Vector]) = {
--- End diff --

By adding explicitly the return type of Scala methods, here 

[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412374
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)]("broadcastedMetrics").get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
--- End diff --

By storing the ```broadcastMeanSet``` and ```broadcastStdSet``` directly as 
```BreezeVector``` we can avoid the repetitive wrapping of ```Vector``` into a 
```BreezeVector``` instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412288
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
--- End diff --

Maybe ```broadcastMean``` and ```broadcastStd``` are better names, because 
these variables are no sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412920
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)]("broadcastedMetrics").get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs & Cramer algorithm was used
+*
+* @param dataSet The data set for which we want to calculate mean and 
variance
+* @return DataSet of Tuple2
--- End diff --

That's Java notation. We could write: A data set containing a single tuple 
of two vectors ```(meanVector, stdVector)```. The first vector repr

[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412479
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)]("broadcastedMetrics").get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
--- End diff --

typo: pass


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.

[GitHub] flink pull request: Ml branch

2015-04-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28412522
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)]("broadcastedMetrics").get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs & Cramer algorithm was used
--- End diff --

Maybe we can put the link 
http://www.cs.yale.edu/publications/techreports/tr222.pdf here, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or i

[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28580607
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest._
+
+class StandardScalerITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's Standard Scaler"
+
+  import StandardScalerData._
+
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+val transformer = new StandardScaler()
+val scaledVectors = transformer.transform(dataSet).collect
--- End diff --

Calculating the mean and std manually and checking against (0, 1) could be 
helpful as well. The thing is that nobody knows how the vectors look like after 
the transformation and thus can only rely on the correctness of the expected 
vectors. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28580779
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)]("broadcastedMetrics").get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs & Cramer algorithm was used
+*
+* @param dataSet The data set for which we want to calculate mean and 
variance
+* @return DataSet of Tuple2
+*/
+  private def extractFeatureMetrics(dataSet: DataSet[Vector]) = {
+
+val metrics = dataSet.combineGroup(new GroupCombineFunction[Vector, 
(Doub

[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28580943
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)]("broadcastedMetrics").get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs & Cramer algorithm was used
+*
+* @param dataSet The data set for which we want to calculate mean and 
variance
+* @return DataSet of Tuple2
+*/
+  private def extractFeatureMetrics(dataSet: DataSet[Vector]) = {
+
+val metrics = dataSet.combineGroup(new GroupCombineFunction[Vector, 
(Doub

[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28580329
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import java.lang.Iterable
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+import org.apache.flink.util.Collector
+
+/** Scales observations, so that all features have mean equal to zero
+  * and standard deviation equal to one
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *  val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *  val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *  transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  def setStd(std: Double): StandardScaler = {
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMeanSet: Vector = null
+  var broadcastStdSet: Vector = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(Vector,
+  Vector)]("broadcastedMetrics").get(0)
+broadcastMeanSet = broadcastedMetrics._1
+broadcastStdSet = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMeanSet.asBreeze
+myVector :/= broadcastStdSet.asBreeze
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pas over the 
data,
+* the Youngs & Cramer algorithm was used
+*
+* @param dataSet The data set for which we want to calculate mean and 
variance
+* @return DataSet of Tuple2
+*/
+  private def extractFeatureMetrics(dataSet: DataSet[Vector]) = {
+
+val metrics = dataSet.combineGroup(new GroupCombineFunction[Vector, 
(Doub

[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28580480
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest._
+
+class StandardScalerITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's Standard Scaler"
+
+  import StandardScalerData._
+
+  it should "first center and then properly scale the given vectors" in {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val dataSet = env.fromCollection(data)
+val transformer = new StandardScaler()
+val scaledVectors = transformer.transform(dataSet).collect
+
+scaledVectors.length should equal(expectedVectors.length)
+
+scaledVectors zip expectedVectors foreach {
+  case (scaledVector, expectedVector) => {
+for (i <- 0 until scaledVector.size) {
+  scaledVector(i) should be(expectedVector(i) +- (0.01))
+}
+  }
+}
+  }
+}
--- End diff --

Maybe a test case with a different mean and std could be helpful to test 
that the parameters are correctly passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/579#issuecomment-93954174
  
Hi Faye, the code looks much better now. I had only some minor comments.

What would be really awesome is to create some documentation for the 
website. You can look at flink/docs/ml/ to see how the other components are 
documented. It's basically the scala doc of the class.

Another thing to think about is the way we use Breeze operators. At the 
moment you have specified all operators with a preceding colon. This is 
perfectly fine but I personally use it only where it's necessary. For example, 
if you multiply a vector with a double you can also write ```vector * 
double```. Of course this operation is also an element wise operation but IMHO 
I think it's easier to parse a formula if you use the colon syntax only if both 
operands are vectors. That way, another person can directly spot that this 
operation is a element-wise vector vector multiplication. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28605205
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+
+/** Scales observations, so that all features have a user-specified mean 
and standard deviation.
+  * By default for StandardScaler transformer mean=0.0 and std=1.0.
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  /**
+   *
+   * @param std the user-specified std value. In case the user gives 0.0 
value as input,
+   *the std is set to the default value: 1.0.
+   * @return the StandardScaler object with its std value set to the 
user-specified value
+   */
+  def setStd(std: Double): StandardScaler = {
+if (std == 0.0) {
+  return this
+}
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMean: linalg.Vector[Double] = null
+  var broadcastStd: linalg.Vector[Double] = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double],
+  linalg.Vector[Double])]("broadcastedMetrics").get(0)
+broadcastMean = broadcastedMetrics._1
+broadcastStd = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMean
+myVector :/= broadcastStd
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pass over the 
data,
+* the Youngs & 

[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28605215
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+
+/** Scales observations, so that all features have a user-specified mean 
and standard deviation.
+  * By default for StandardScaler transformer mean=0.0 and std=1.0.
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  /**
+   *
+   * @param std the user-specified std value. In case the user gives 0.0 
value as input,
+   *the std is set to the default value: 1.0.
+   * @return the StandardScaler object with its std value set to the 
user-specified value
+   */
+  def setStd(std: Double): StandardScaler = {
+if (std == 0.0) {
+  return this
+}
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMean: linalg.Vector[Double] = null
+  var broadcastStd: linalg.Vector[Double] = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double],
+  linalg.Vector[Double])]("broadcastedMetrics").get(0)
+broadcastMean = broadcastedMetrics._1
+broadcastStd = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMean
+myVector :/= broadcastStd
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pass over the 
data,
+* the Youngs & 

[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/579#discussion_r28605175
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg
+import breeze.linalg.DenseVector
+import breeze.numerics.sqrt
+import breeze.numerics.sqrt._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
+
+/** Scales observations, so that all features have a user-specified mean 
and standard deviation.
+  * By default for StandardScaler transformer mean=0.0 and std=1.0.
+  *
+  * This transformer takes a a Vector of values and maps it into the
+  * scaled Vector that each feature has a user-specified mean and standard 
deviation.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
+  * [[Vector]].
+  *
+  * @example
+  * {{{
+  *val trainingDS: DataSet[Vector] = 
env.fromCollection(data)
+  *
+  *val transformer = 
StandardScaler().setMean(10.0).setStd(2.0)
+  *
+  *transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
+  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * equal to 1
+  */
+class StandardScaler extends Transformer[Vector, Vector] with Serializable 
{
+
+  def setMean(mu: Double): StandardScaler = {
+parameters.add(Mean, mu)
+this
+  }
+
+  /**
+   *
+   * @param std the user-specified std value. In case the user gives 0.0 
value as input,
+   *the std is set to the default value: 1.0.
+   * @return the StandardScaler object with its std value set to the 
user-specified value
+   */
+  def setStd(std: Double): StandardScaler = {
+if (std == 0.0) {
+  return this
+}
+parameters.add(Std, std)
+this
+  }
+
+  override def transform(input: DataSet[Vector], parameters: ParameterMap):
+  DataSet[Vector] = {
+val resultingParameters = this.parameters ++ parameters
+val mean = resultingParameters(Mean)
+val std = resultingParameters(Std)
+
+val featureMetrics = extractFeatureMetrics(input)
+
+input.map(new RichMapFunction[Vector, Vector]() {
+
+  var broadcastMean: linalg.Vector[Double] = null
+  var broadcastStd: linalg.Vector[Double] = null
+
+  override def open(parameters: Configuration): Unit = {
+val broadcastedMetrics = 
getRuntimeContext().getBroadcastVariable[(linalg.Vector[Double],
+  linalg.Vector[Double])]("broadcastedMetrics").get(0)
+broadcastMean = broadcastedMetrics._1
+broadcastStd = broadcastedMetrics._2
+  }
+
+  override def map(vector: Vector): Vector = {
+var myVector = vector.asBreeze
+
+myVector :-= broadcastMean
+myVector :/= broadcastStd
+myVector = (myVector :* std) :+ mean
+return myVector.fromBreeze
+  }
+}).withBroadcastSet(featureMetrics, "broadcastedMetrics")
+  }
+
+  /** Calculates in one pass over the data the features' mean and standard 
deviation.
+* For the calculation of the Standard deviation with one pass over the 
data,
+* the Youngs & 

[GitHub] flink pull request: Ml branch

2015-04-17 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/579#issuecomment-94012543
  
Three minor comments. Otherwise it looks really good :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/579#issuecomment-94698596
  
Great work Faye. Will merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Ml branch

2015-04-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/579


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---