[GitHub] flink issue #3428: [FLINK-1743] Add multinomial logistic regression to machi...

2017-10-03 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/3428
  
Hello @bowenli86, AFAIK nobody's working on the PR's right now. I'm 
currently 100% focused on finishing my PhD so I won't have time to review.


---


[GitHub] flink issue #3428: [FLINK-1743] Add multinomial logistic regression to machi...

2017-04-20 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/3428
  
Thank you for the info!

In that case it's a bit ambiguous now which issue this PR is solving. If 
you are introducing an interface for GLMs we should be having this discussion 
under that context, because it changes the scope of the PR quite significantly 
re. design decisions vs. a simple implementation of logistic regression.

If you want, we can split this into different PRs and take it from there.

P.S. Any code that comes from the Spark project mus be clearly labeled as 
such, please make sure to give proper attribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3428: [FLINK-1743] Add multinomial logistic regression to machi...

2017-04-18 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/3428
  
Hello @mtunique and thank you for you contribution! 

Could you briefly describe your changes/additions and how you have tested 
the implementation for correctness?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3313: [FLINK-5588][ml] add a data normalizer to ml library

2017-02-15 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/3313
  
Hello @skonto thanks for your contribution!

I'm currently snowed under paper deadlines, so I can't give you a time for 
when I'll be able to go through this, hopefully within the next 2-3 weeks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1849: [FLINK-2157] [ml] Create evaluation framework for ML libr...

2017-01-20 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/1849
  
Hello @gaborhermann. Personally I prefer to have PRs be as specific as 
possible, so I would recommend we try to get this merged before #2838, and then 
rebase that on master.

Given the committer load however this could take a while.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1849: [FLINK-2157] [ml] Create evaluation framework for ...

2017-01-17 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/1849#discussion_r96406269
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
+
+import scala.reflect.ClassTag
+
+/**
+ * Evaluation score
+ *
+ * Can be used to calculate a performance score for an algorithm, when 
provided with a DataSet
+ * of (truth, prediction) tuples
+ *
+ * @tparam PredictionType output type
+ */
+trait Score[PredictionType] {
--- End diff --

The goal is to reduce code duplication, many models can share the same 
evaluation infrastructure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1849: [FLINK-2157] [ml] Create evaluation framework for ML libr...

2017-01-17 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/1849
  
Hello @skonto this PR will probably be subsumed by #2838, you can check out 
the latest development there. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #757: [FLINK-2131][ml]: Initialization schemes for k-means clust...

2017-01-17 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/757
  
Sure @sachingoel0101 feel free to split up the PRs to reduce overhead.

For added initialization schemes let me throw [this recent 
NIPS](https://papers.nips.cc/paper/6478-fast-and-provably-good-seedings-for-k-means)
 paper in there, as it might be relatively easy to implement, but we can add it 
on later as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2761: [FLINK-3551] [examples] Sync Scala Streaming Examples

2016-12-14 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2761
  
Hello @ch33hau, sorry for the late reply, I've been at a conference the 
past week. With the latest changes this LGTM, I've edited the fix version in 
JIRA to 1.2.0 to give this more visibility for the upcoming release, since it's 
very useful to have more examples.

Hopefully some committer can take a look soon, I'll ping @fhueske here, 
maybe he can shepherd the PR or assign somebody.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-12-06 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r91119867
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
 ---
@@ -60,11 +64,9 @@ object SessionWindowing {
 input.foreach(value => {
   ctx.collectWithTimestamp(value, value._2)
   ctx.emitWatermark(new Watermark(value._2 - 1))
-  if (!fileOutput) {
-println(s"Collected: ${value}")
-  }
+  if (!fileOutput) println(s"Collected: ${value}")
--- End diff --

AFAIK we encourage adding braces even for one line if statements, so the 
previous version was better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-12-06 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r91119955
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
 ---
@@ -77,9 +79,8 @@ object SessionWindowing {
   .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
   .sum(2)
 
-if (fileOutput) {
-  aggregated.writeAsText(params.get("output"))
-} else {
+if (fileOutput) aggregated.writeAsText(params.get("output"))
--- End diff --

Same here, braces for if.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-12-06 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r91119626
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
 ---
@@ -27,6 +27,10 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.windowing.time.Time
 
+/**
+  * An example of grouped stream windowing into sliding time windows.
+  * This example uses [[RichParallelSourceFunction]] to generates a list 
of key-value pair.
--- End diff --

Typo: "generates" should be "generate"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-12-06 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r91119469
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -159,11 +160,46 @@ object IncrementalLearningSkeleton {
 * Builds up-to-date partial models on new training data.
 */
   private class PartialModelBuilder extends AllWindowFunction[Int, 
Array[Double], TimeWindow] {
+
+protected def buildPartialModel(values: Iterable[Int]): Array[Double] 
= Array[Double](1)
+
 override def apply(window: TimeWindow,
-   input: Iterable[Int],
+   values: Iterable[Int],
out: Collector[Array[Double]]): Unit = {
-  out.collect(Array[Double](1.0))
+  out.collect(buildPartialModel(values))
+}
+  }
+
+  /**
+* Creates newData using the model produced in batch-processing and the
+* up-to-date partial model.
+* 
+* By defaults emits the Integer 0 for every newData and the Integer 1
--- End diff --

Typo: "defaults" should be "default"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-11-25 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r89645498
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.kafka
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+
+/**
+  * Read Strings from Kafka and print them to standard out.
+  * Note: On a cluster, DataStream.print() will print to the TaskManager's 
.out file!
+  *
+  * Please pass the following arguments to run the example:
+  * --topic test
+  * --bootstrap.servers localhost:9092
+  * --zookeeper.connect localhost:2181
+  * --group.id myconsumer
+  *
+  */
+object ReadFromKafka {
+
+  def main(args: Array[String]): Unit = {
+
+// parse input arguments
+val params = ParameterTool.fromArgs(args)
+
+if (params.getNumberOfParameters < 4) {
+  println("Missing parameters!\nUsage: Kafka --topic  " +
+"--bootstrap.servers  --zookeeper.connect  --group.id ")
+  return
+}
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.getConfig.disableSysoutLogging
+
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1))
+// create a checkpoint every 5 seconds
+env.enableCheckpointing(5000)
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create a Kafka streaming source consumer for Kafka 0.8.x
+val kafkaConsumer = new FlinkKafkaConsumer08(
--- End diff --

I would suggest to keep it as is for now unless a project committer 
suggests otherwise. 

The linked issue would indicate that the intention is to have different 
versions, but I guess that would affect the release as well, since different 
jar files would need to be generated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & e...

2016-11-25 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2838
  
> The problem is not with the evaluate(test: TestType): DataSet[Double] but 
rather with evaluate(test: TestType): DataSet[(Prediction,Prediction)].

Completely agree there, I advocated for removing/renaming the evaluate 
function, we considered using a `score` function for a more sklearn-like 
approach before, see e.g. #902. Having _some_ function that returns a 
`DataSet[(truth: Prediction,pred: Prediction)]` is useful and probably 
necessary, but we should look at alternatives as the current state is confusing.
I think I like the approach you are suggesting, so feel free to come up 
with an alternative in the WIP PRs.

Getting rid of the Pipeline requirements for recommendation algorithms 
would simplify some things. In that case we'll have to re-evaluate if it makes 
sense for them to implement the `Predictor` interface at all, or maybe we have 
`ChainablePredictors` but I think our hierarchy is deep enough already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-11-24 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r89469773
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
+  (_: Array[Double]) => 1
+)
+
+// emit result
+if (params.has("output")) {
+  prediction.writeAsText(params.get("output"))
+} else {
+  println("Printing result to stdout. Use --output to specify output 
path.")
+  prediction.print()
+}
+
+// execute program
+env.execute("Streaming Incremental Learning")
+  }
+
+  // 
*
+  // USER FUNCTIONS
+  // 
*
+
+  /**
+* Feeds new data for newData. By default it is implemented as 
constantly
+* emitting the Integer 1 in a loop.
+*/
+  private class FiniteNewDataSource extends SourceFunction[Int] {
+var counter: Int = 0
+
+override def run(ctx: SourceContext[Int]) = {
+  Thread.sleep(15)
+  while (counter < 50) {
+ctx.collect(getNewData)
+  }
+}
+
+def getNewDat

[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-11-24 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r89465547
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.kafka
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+
+/**
+  * Generate a String every 500 ms and write it into a Kafka topic
+  *
+  * Please pass the following arguments to run the example:
+  * --topic test --bootstrap.servers localhost:9092
+  *
+  */
+object WriteIntoKafka {
+
+  def main(args: Array[String]): Unit = {
+
+// parse input arguments
+val params = ParameterTool.fromArgs(args)
+
+if (params.getNumberOfParameters < 2) {
+  println("Missing parameters!")
+  println("Usage: Kafka --topic  --bootstrap.servers ")
+  return
+}
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.getConfig.disableSysoutLogging
+
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1))
+
+// very simple data generator
+val messageStream = env.addSource(new SourceFunction[String]() {
+  var running = true
+
+  override def run(ctx: SourceContext[String]): Unit = {
+var i = 0L
+while (this.running) {
+  ctx.collect(s"Element - ${i}")
+  i += 1
+  Thread.sleep(500)
+}
+  }
+
+  override def cancel(): Unit = running = false
+})
+
+// create a Kafka producer for Kafka 0.8.x
+val kafkaProducer = new FlinkKafkaProducer08(
--- End diff --

As before any reason to keep using 0.8?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-11-24 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r89471075
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+
+object GroupedProcessingTimeWindowExample {
--- End diff --

Some docstring similar to what the other examples have would be very useful 
here, same for the Java code that is missing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-11-24 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r89472254
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.examples.java.wordcount.util.WordCountData
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+
+/**
+  * Implements a windowed version of the streaming "WordCount" program.
+  *
+  * 
+  * The input is a plain text file with lines separated by newline 
characters.
+  *
+  * 
+  * Usage: WordCount
+  * --input path
+  * --output path
+  * --window n
+  * --slide n
+  * 
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
--- End diff --

Probably better to have a Scala-style link here i.e. 
`[[org.apache.flink.examples.java.wordcount.util.WordCountData]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-11-24 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r89464880
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.kafka
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+
+/**
+  * Read Strings from Kafka and print them to standard out.
+  * Note: On a cluster, DataStream.print() will print to the TaskManager's 
.out file!
+  *
+  * Please pass the following arguments to run the example:
+  * --topic test
+  * --bootstrap.servers localhost:9092
+  * --zookeeper.connect localhost:2181
+  * --group.id myconsumer
+  *
+  */
+object ReadFromKafka {
+
+  def main(args: Array[String]): Unit = {
+
+// parse input arguments
+val params = ParameterTool.fromArgs(args)
+
+if (params.getNumberOfParameters < 4) {
+  println("Missing parameters!\nUsage: Kafka --topic  " +
+"--bootstrap.servers  --zookeeper.connect  --group.id ")
+  return
+}
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.getConfig.disableSysoutLogging
+
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1))
+// create a checkpoint every 5 seconds
+env.enableCheckpointing(5000)
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create a Kafka streaming source consumer for Kafka 0.8.x
+val kafkaConsumer = new FlinkKafkaConsumer08(
--- End diff --

I understand that the Java example uses the 0.8 consumer as well, however 
is there any reason to continue using that version vs. a more recent one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-11-24 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r89471865
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+
+object SessionWindowing {
--- End diff --

Same here, docstring explaining the example is needed, please add for the 
Java version as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaiming Exam...

2016-11-24 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r89469371
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
--- End diff --

I think the way this example is implement does get the point of the example 
across.

We are not trying to just generate a stream of 1s and 0s, the purpose is to 
show that we can use the connected stream coming from the `newData` and the 
`model` streams to read in a batch model which we enhance with the 
`partialModel`, and then use the new data stream to continuously improve the 
partial model.

I would recommend making this step more verbose as it is in the Java 
version of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2764: [FLINK-5008] Update quickstart documentation

2016-11-24 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2764
  
Hello @NicoK , do you think you can include 
[FLINK-5087](https://issues.apache.org/jira/browse/FLINK-5087) in this PR, or 
should we create a new one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2819: [FLINK-4961] [ml] SGD for Matrix Factorization (WIP)

2016-11-23 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2819
  
Hello @gaborhermann,

I really like the idea of introducing a `MatrixFactorization` interface 
that we can then use for different specialized optimization algorithms. For the 
question I'm afraid I can't be of much help, I'll read the relevant paper this 
week and get back to you if I have any more comments.

1. Don't know enough about joins to answer this :/
2. For this we would need to test the two solutions you have proposed and 
evaluate the performance. You have listed a couple of pros/cons there, maybe 
you can elaborate?
3. If there is absolutely no benefit from using more blocks I don't see the 
need to investigate further. We'll need to include these instructions in the 
docs.
4. I think test hardening should be done in a different PRs (potentially 
for more algorithms). For now manual tests should suffice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & e...

2016-11-23 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2838
  
Hello Gabor, 

I like the idea of having a RankingScore, it seems like having that 
hierarchy with Score, RankingScore and PairWiseScore gives us the flexibility 
we need to include ranking and supervised learning evaluation under the same 
umbrella.

I would also encourage sharing any other ideas you broached that might 
break the API, this is still very much an evolving project and there is no need 
to shoehorn everything into an `evaluate(test: TestType): DataSet[Double]` 
function if there are better alternatives.

One think we need to consider is how this affects cross-validation and 
model selection/hyper-parameter tuning. These two aspects of the library are 
tightly linked and I think that we'll need to work on them in parallel to find 
issues that affect both.

I recommend taking a look at the [cross-validation 
PR](https://github.com/apache/flink/pull/891) I had opened way back when, and 
make a new WIP PR that uses the current one (#2838) as a basis. Since the 
`Score` interface still exists it shouldn't require many changes, and all 
that's added is the CrossValidation class. There are other fundamental issues 
with the sampling there we can discuss in due time.

Regarding the RankingPredictor we should consider the usecase of such an 
interface. Is it only going to be used for recommendation? If yes, what are the 
cases where we could build a Pipeline with current or future pre-processing 
steps? Could you give some pipeline examples in a recommendation setting?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-23 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r89292988
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
 */
   def evaluate[Testing, PredictionValue](
   testing: DataSet[Testing],
-  evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-  evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+  evaluateParameters: ParameterMap = ParameterMap.Empty)
+  (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
 : DataSet[(PredictionValue, PredictionValue)] = {
 FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
 evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
+trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
+  that: Self =>
+
+  def predictRankings(
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] =
+rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
+
+  def evaluateRankings(
+testing: DataSet[(Int,Int,Double)],
+evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] = {
+// todo: do not burn 100 topK into code
+predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
+  }
+}
+
+trait RankingPredictOperation[Instance] {
+  def predictRankings(
+instance: Instance,
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)
+  : DataSet[(Int, Int, Int)]
+}
+
+/**
+  * Trait for providing auxiliary data for ranking evaluations.
+  *
+  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
+  * from the recommended top K items.
+  */
+trait TrainingRatingsProvider {
+
+  def getTrainingData: DataSet[(Int, Int, Double)]
+
+  /**
+* Retrieving the training items.
+* Although this can be calculated from the training data, it requires 
a costly
+* [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
+* given more efficiently from the item factors.
+*/
+  def getTrainingItems: DataSet[Int] = {
+getTrainingData.map(_._2).distinct()
+  }
+}
+
+/**
+  * Ranking predictions for the most common case.
+  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
+  */
+class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
+(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
+  extends RankingPredictOperation[Instance] {
+
+  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
+  : DataSet[(Int, Int)] = {
+users.cross(items)
--- End diff --

This could very well blow up, do we have any limits on the size of the 
users  and items DataSets? 

If I understand correctly, users comes from the calling predictRankings 
function and contains userids (potentially all users) and items are all the 
items in the training set, which could be in the millions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-22 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314, I should have clarified for documentation I meant 
apart from the docstrings you have added now, we also have to include 
documentation in the Flink 
[docs](https://github.com/apache/flink/tree/master/docs/dev/libs/ml) for each 
new addition.

See for example the docs for the [standard 
scaler](https://github.com/apache/flink/blob/master/docs/dev/libs/ml/standard_scaler.md).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & e...

2016-11-21 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2838
  
Hello @gaborhermann, thanks for making the PR!

I'll try to take a look this week, I've been busy with a couple of other 
PRs these days.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-21 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314,

This PR is still missing documentation. After that is done a project 
committer will have to review it before it gets merged, which might take a 
while.

Regards,
Theodore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2764: [FLINK-5008] Update quickstart documentation

2016-11-17 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2764
  
IIRC @vasia is using Eclipse to develop, maybe she can chime in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2761: [FLINK-3551] [examples] Sync Scala Streaiming Examples

2016-11-16 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2761
  
Hello Lim, thank you for your contribution!

I've taken a quick look and most of these look fine, plus I see you've 
included the required tests.
I'll do a review this week and hopefully a committer will have some time to 
take a second look and merge this soon.

Regards,
Theodore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-15 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan Excuse my ignorance, I'm only now learning about Flink internals 
:)
It seems like the issue here was that `partitionByRange` partitions keys in 
ascending order but we want the end result in descending order.

@tfournier314 I think the following should work, here I use a key extractor 
to negate the value of the key to achieve the desired effect:

```Scala
itData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => -x._2) // Take the negative count as the key
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-15 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314 I tested your code and it does seem that partitions are 
sorted
only internally, which is expected and `zipWithIndex` is AFAIK unaware of 
the sorted (as in key range) order of partitions, so it's not guaranteed that 
the "first" partition will get the `[0, partitionSize-1]` indices, the second 
`[partitionSize, 2*partitionSize]` etc. Maybe @greghogan knows a solution for 
global sorting?

If it's not possible I think we can take a step back and see what we are 
trying to achieve here.

The task is to count the frequencies of labels and assign integer ids to 
them in order of frequency. The labels should either be categorical variables 
(e.g. {Male, Female, Uknown}) or class labels. The case with the most unique 
values might be vocabulary words, which will range in the few million unique 
values at worst.

I would argue then than after we have performed the frequency count in a 
distributed manner there is no need to do the last step which is assigning 
ordered indices in a distributed manner as well, we can make the assumption 
that all the (label -> frequency) values should fit into the memory of one 
machine.

So I would recommend to gather all data into one partition after getting 
the counts, that way we guarantee a global ordering:

```{Scala}
fitData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => 0)
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
  .print()
```

Of course we would need to clarify this restriction in the docstrings and 
documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-10 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87422110
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -273,6 +308,14 @@ object ALS {
 val defaultValue: Option[Int] = Some(10)
   }
 
+  case object ImplicitPrefs extends Parameter[Boolean] {
--- End diff --

You are correct the recommendation is from the iALS paper, but I'm not sure 
if the same holds for ALS. I trust your judgment here, since I'm not as 
familiar with xALS as I should be to have a good intuition about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-10 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87421513
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -675,7 +756,69 @@ object ALS {
   collector.collect((blockID, array))
 }
   }
-}.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+}
+
+// broadcasting XtX matrix in the implicit case
+val updatedFactorMatrix = if (implicitPrefs) {
+  newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+} else {
+  newMatrix
+}
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+  }
+
+  /**
+* Computes the XtX matrix for the implicit version before updating the 
factors.
+* This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
+* iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
+*
+* The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+* then sums all these computed matrices to get `X^T * X`.
+*/
+  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
+  DataSet[Array[Double]] = {
+val triangleSize = factors * (factors - 1) / 2 + factors
+
+type MtxBlock = (Int, Array[Array[Double]])
+// construct XtX for all blocks
+val xtx = x
+  .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+var xtxForBlock: Array[Double] = null
+
+override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
+  out: Collector[Array[Double]]): Unit = {
+
+  if (xtxForBlock == null) {
+// creating the matrix if not yet created
+xtxForBlock = Array.fill(triangleSize)(0.0)
+  } else {
+// erasing the matrix
+var i = 0
+while (i < xtxForBlock.length) {
--- End diff --

I don't imagine this making a major difference in performance, so let's 
just go with the cleaner code angle and use `fill`.

I wish we had an easy to use integrated way to do proper profiling so such 
decisions can be easier (i.e. if this is 0.5% of the CPU cost, then optimizing 
is pointless but right now we don't know)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87202604
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -675,7 +756,69 @@ object ALS {
   collector.collect((blockID, array))
 }
   }
-}.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+}
+
+// broadcasting XtX matrix in the implicit case
+val updatedFactorMatrix = if (implicitPrefs) {
+  newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+} else {
+  newMatrix
+}
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+  }
+
+  /**
+* Computes the XtX matrix for the implicit version before updating the 
factors.
+* This matrix is intended to be broadcast, but as we cannot use a sink 
inside a Flink
+* iteration, so we represent it as a [[DataSet]] with a single element 
containing the matrix.
+*
+* The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+* then sums all these computed matrices to get `X^T * X`.
+*/
+  private[recommendation] def computeXtX(x: DataSet[(Int, 
Array[Array[Double]])], factors: Int):
+  DataSet[Array[Double]] = {
+val triangleSize = factors * (factors - 1) / 2 + factors
+
+type MtxBlock = (Int, Array[Array[Double]])
+// construct XtX for all blocks
+val xtx = x
+  .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+var xtxForBlock: Array[Double] = null
+
+override def mapPartition(blocks: Iterable[(Int, 
Array[Array[Double]])],
+  out: Collector[Array[Double]]): Unit = {
+
+  if (xtxForBlock == null) {
+// creating the matrix if not yet created
+xtxForBlock = Array.fill(triangleSize)(0.0)
+  } else {
+// erasing the matrix
+var i = 0
+while (i < xtxForBlock.length) {
--- End diff --

Any reason why `fill` is not/cannot be used here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87199446
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -273,6 +308,14 @@ object ALS {
 val defaultValue: Option[Int] = Some(10)
   }
 
+  case object ImplicitPrefs extends Parameter[Boolean] {
--- End diff --

Can't find a way to comment on line 264/299 but we should take the 
opportunity to set the default number of factors to a more reasonable 50, and 
add to the docstring and documentation the recommendation:

> we recommend working with the highest number of factors feasible within 
computational limitations.

Which comes straight from the iALS paper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87195483
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -156,6 +171,26 @@ class ALS extends Predictor[ALS] {
 this
   }
 
+  /** Sets the input observations to be implicit, thus using the iALS 
algorithm for learning.
--- End diff --

The docstring is not worded correctly, as the passed argument could be true 
or false.

Should be prefixed with something like "When set to true, we assume 
implicit observations..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r87201508
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -535,8 +581,17 @@ object ALS {
 itemOut: DataSet[(Int, OutBlockInformation)],
 userIn: DataSet[(Int, InBlockInformation)],
 factors: Int,
-lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
+lambda: Double, blockIDPartitioner: FlinkPartitioner[Int],
+implicitPrefs: Boolean,
+alpha: Double):
   DataSet[(Int, Array[Array[Double]])] = {
+// retrieve broadcast XtX matrix in implicit case
+val XtXtoBroadcast = if (implicitPrefs) {
--- End diff --

I'm a bit confused with the notation here, is this matrix the `YtY ` matrix 
from the paper? If yes, I would recommend sticking to the notation of the paper 
to avoid confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2740: [FLINK-4964] [ml]

2016-11-09 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r87192044
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,108 @@
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * String Indexer
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[DataSet[(String, 
Long)]] = None
+
+
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 
==
+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
=
+
+  /**
+* Trains [[StringIndexer]] by learning the count of each string in the 
input DataSet.
+*/
+
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap, input: 
DataSet[String]):Unit ={
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  private def extractIndices(input: DataSet[String]): DataSet[(String, 
Long)] = {
+
+val mapping = input
+  .mapWith( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .partitionByRange( 1 )
--- End diff --

Could you explain what is the mapping doing here? If you are trying to sort 
shouldn't you be using `.sortPartition()` after the partition?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2542: [FLINK-4613] [ml] Extend ALS to handle implicit feedback ...

2016-11-09 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2542
  
@gaborhermann Yup the approach taken by the Spark community for testing is 
closer to what we would like to have for non-deterministic algorithms, but what 
you have implemented now should suffice on the assumption that the ALS 
implementation is correct.

@tillrohrmann Initially implemented ALS so I'm not sure how he arrived at 
the expected results. It would be a good idea for the future to document how we 
generate test data so it's easy to replicate and validate the process. That 
should be enough for deterministic algorithms, and for non-deterministic we 
should have proxies like measuring the error of reconstruction etc.

I'll take a look at the code again now, and will add comments if I find 
something. Otherwise I hope @mbalassi can find some time to review and merge if 
no objections come up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2542: [FLINK-4613] [ml] Extend ALS to handle implicit feedback ...

2016-11-09 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2542
  
Hello @gaborhermann,

Yes I think you are right in that respect, just wanted to note that we 
should perform some comparative benchmarks in the future.

So the benchmarks look good IMHO, we now need to address the couple of 
comments I had, namely splitting up the tests and deciphering why a 
`java.Iterable` was used in that spot if possible.

I was also wondering: For the expected results in the test, where did you 
get the reference data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2542: [FLINK-4613] [ml] Extend ALS to handle implicit feedback ...

2016-11-08 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2542
  
Thank you @jfeher!

Could you clarify what you mean by filtering the data to get unique 
item-user pairs? Is this because the iALS algorithm only supports binary 
interactions (i.e. number of interactions does not play a role)?

Any idea how these runtime numbers compare to alternative implementations 
(Spark, [Cython](https://github.com/benfred/implicit))?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2684: Add EvaluateDataSet Operation for LabeledVector - This cl...

2016-11-03 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2684
  
Hello @tillrohrmann I think you make some valid points. My original idea 
was to move completely to using `LabeledVector` to the de facto format for 
supervised learning problems and do away with `(Vector, Double)` tuples.

What you suggest would add some extra flexibility though, esp. in terms of 
`(FeatureVector, LabelVector)` tuples which we have discussed in the context of 
ANNs.

@tfournier314 Could you investigate the applicability of the scheme that 
Till recommended?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2735: [FLINK-2094] implements Word2Vec for FlinkML

2016-11-03 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2735
  
Thank you for your contribution Kalman!

I just took a brief look, this is a big PR so will probably take some time 
to review.

For now a few things that jump to mind: 

* We'll need to add docs for the algorithm, which should be example heavy. 
[Here's](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/standard_scaler.html)
 a simple example for another pre-processing algorithm. I see you already have 
extensive ScalaDoc's we could prolly replicate those in the docs.
* Have you tested it in a relatively large scale dataset? Ideally in a 
distributed setting where the input files are on HDFS. This way we test the 
scalability of the implementation, and problems usually arise.
* Have you compared the output with a reference implementation? My 
knowledge of word2vec is not very deep but as far as I understand the output is 
non-deterministic, so we would need some sort of proxy to evaluate the 
integrated correctness of the implementation.
* Finally I see this introduces a new nlp package. I'm not sure how to 
treat this (and relevant algorithms, say TF-IDF), as they are not necessarily 
NLP specific, even though they stem from the field you could treat any sequence 
of objects as a "sentence" and embed them. I would favor including them as 
pre-processing steps and hence inheriting from the `Transformer` interface, 
perhaps by having a feature pre-processing package.

Regards,
Theodore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2682: [FLINK-4886] [docs] Update ML quickstart loading svm test...

2016-10-28 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2682
  
Thank you @ch33hau I'll also mark the issue as a duplicate!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2684: Add EvaluateDataSet Operation for LabeledVector - This cl...

2016-10-27 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2684
  
One last thing, could you change this PR's title to `[FLINK-4865] [ml]  Add 
EvaluateDataSet operation for LabeledVector`

This is the standard naming for PRs, issue number, followed by component, 
followed by the description. That way the PR gets linked back to the JIRA issue 
automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2684: Add EvaluateDataSet Operation for LabeledVector - This cl...

2016-10-27 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2684
  
> We don't need to care about Jenkins/Travis fails, do we ?

Normally we should, but there many unstable tests and issues with Travis 
that it's hard to follow now.
You should generally make sure the failures don't originate from code you 
have contributed, and then it's up to the commiter to decide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2684: Add EvaluateDataSet Operation for LabeledVector - This cl...

2016-10-25 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2684
  
Thank you for your contribution Thomas!

The changes look mostly fine I think, my only concern is the need to cast 
the types in lines 
[215-216](https://github.com/apache/flink/pull/2684/files#diff-c74d49c94a86f962939a42362f3ea190R215),
 I'm not sure if there is a more elegant way to do that.

I'm going to ping @chiwanpark, @mbalassi and @tillrohrmann, if anyone has 
some time to review and merge this let us know in the comments.

As a side note, I've noticed that we don't include the evaluate operation 
in the documentation of all Predictors, so I've created 
[FLINK-4908](https://issues.apache.org/jira/browse/FLINK-4908) to track that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2668: Add EvaluateDataSetOperation for LabeledVector. This clos...

2016-10-21 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2668
  
Hello Thomas, thank you for your contribution!

I took a brief look so some initial comments:

This seems to be making changes to `MLUtils` which AFAIK is outside the 
scope of this issue. I would recommend you isolate changes into different 
issues and PRs.

I also see a lot of style changes to existing code. The code style we try 
to follow is [this one](https://github.com/databricks/scala-style-guide), I 
would recommend you review that and try to follow it. 

As a rule of thumb we don't make style changes to existing code, unless the 
existing code does not conform to the linked style. Even in that case I would 
recommend opening a different PR with only style changes, as it makes reviewing 
the core PR (which is the added code here) easier.

So I'd recommend to remove the style changes you've made from this PR as 
well. If there is existing code that violates the linked style we can open a 
new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2658: [FLINK-4850] [ml] FlinkML - SVM predict Operation ...

2016-10-19 Thread thvasilo
GitHub user thvasilo opened a pull request:

https://github.com/apache/flink/pull/2658

[FLINK-4850] [ml]  FlinkML - SVM predict Operation for Vector and not 
LaveledVector

The current version of the quickstart guide includes erroneous code, this 
changes the function calls to have correct signatures and arguments.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thvasilo/flink quickstart-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2658.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2658


commit 6104b02244c08d1c8669dd3265516c915d82d8c4
Author: Theodore Vasiloudis <t...@sics.se>
Date:   2016-10-19T12:09:09Z

Fixed code for FlinkML quickstart guide.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #1849: [FLINK-2157] [ml] Create evaluation framework for ML libr...

2016-10-04 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/1849
  
@gaborhermann In terms of missing features, documentation is definitely 
missing, as @rawkintrevo mentioned.

For the issues mentioned in the JIRA issue you linked I've replied on the 
dev list thread you started, all valid points re. adjusting this to handle 
recommendations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2542: [FLINK-4613] [ml] Extend ALS to handle implicit feedback ...

2016-09-29 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2542
  
@gaborhermann I think have a larger scale test would boost our confidence 
in the implementation, and maybe point out some problems that do not manifest 
with small data, which is very common.

If you plan to do it anyway we might as well do it before merging the code, 
but it's not a blocker.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81159482
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
--- End diff --

AFAIK in the rest of the FlinkML tests we just use `val env = 
ExecutionEnvironment.getExecutionEnvironment`. I don't know if that policy has 
now changed, maybe @tillrohrmann can clarify.

For now I would say to just split the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] [ml] Extend ALS to handle implicit fe...

2016-09-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r81159171
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -581,6 +637,16 @@ object ALS {
 val userXy = new ArrayBuffer[Array[Double]]()
 val numRatings = new ArrayBuffer[Int]()
 
+var precomputedXtX: Array[Double] = null
+
+override def open(config: Configuration): Unit = {
+  // retrieve broadcasted precomputed XtX if using implicit 
feedback
+  if (implicitPrefs) {
+precomputedXtX = 
getRuntimeContext.getBroadcastVariable[Array[Double]]("XtX")
+  .iterator().next()
+  }
+}
+
 override def coGroup(left: lang.Iterable[(Int, Int, 
Array[Array[Double]])],
--- End diff --

We can ping @tillrohrmann here, as the original author maybe he has some 
input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860723
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -99,6 +114,26 @@ The alternating least squares implementation can be 
controlled by the following
 
   
   
+ImplicitPrefs
+
+  
+Implicit property of the observations, meaning that they do 
not represent an explicit
+preference of the user, just the implicit information how many 
times the user consumed the
+(Default value: false)
+  
+
+  
+  
+Alpha
+
+  
+Weight of the positive implicit observations. Should be 
non-negative.
+Only relevant when ImplicitPrefs is set to true.
+(Default value: 1)
--- End diff --

Can you provide some motivation for this default value? From the paper I 
see:

> In our experiments, setting α = 40 was found to produce good results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80862241
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
+import ExampleMatrix._
+
+val rand = scala.util.Random
+val numBlocks = 3
+// randomly split matrix to blocks
+val blocksY = Y
+  // add a random block id to every row
+  .map { row =>
+(rand.nextInt(numBlocks), row)
+  }
+  // get the block via grouping
+  .groupBy(_._1).values
+  // add a block id (-1) to each block
+  .map(b => (-1, b.map(_._2)))
+  .toSeq
+
+// use Flink to compute YtY
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val distribBlocksY = env.fromCollection(blocksY)
+
+val YtY = ALS
+  .computeXtX(distribBlocksY, factors)
+  .collect().head
+
+// check YtY size
+YtY.length should be (factors * (factors - 1) / 2 + factors)
+
+// check result is as expected
+expectedUpperTriangleYtY
+  .zip(YtY)
+  .foreach { case (expected, result) =>
+result should be (expected +- 0.1)
+  }
+
+// temporary directory to avoid too few memory segments
+val tempDir = CommonTestUtils.getTempDir + "/"
+
+// factorize matrix with implicit ALS
+val als = ALS()
+  .setIterations(iterations)
+  .setLambda(lambda)
+  .setBlocks(blocks)
+  .setNumFactors(factors)
+  .setImplicit(true)
+  .setAlpha(alpha)
+  .setSeed(seed)
+  .setTemporaryPath(tempDir)
+
+val inputDS = env.fromCollection(implicitRatings)
+
+als.fit(inputDS)
+
+// check predictions on some user-item pairs
+val testData = env.fromCollection(expectedResult.map{
+  case (userID, itemID, rating) => (userID, itemID)
+})
+
+val predictions = als.predict(testData).collect()
+
+predictions.length should equal(expectedResult.length)
+
+val resultMap = expectedResult map {
+  case (uID, iID, value) => (uID, iID) -> value
+} toMap
+
+predictions foreach {
+  case (uID, iID, value) => {
+resultMap.isDefinedAt((uID, iID)) should be(true)
+
+value should be(resultMap((uID, iID)) +- 1e-5)
+  }
+}
+
+  }
+
+}
+
+object ExampleMatrix {
--- End diff --

Data should go to the `Recommendation.scala` file, as with the plain ALS 
matrix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80861067
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 ---
@@ -581,6 +637,16 @@ object ALS {
 val userXy = new ArrayBuffer[Array[Double]]()
 val numRatings = new ArrayBuffer[Int]()
 
+var precomputedXtX: Array[Double] = null
+
+override def open(config: Configuration): Unit = {
+  // retrieve broadcasted precomputed XtX if using implicit 
feedback
+  if (implicitPrefs) {
+precomputedXtX = 
getRuntimeContext.getBroadcastVariable[Array[Double]]("XtX")
+  .iterator().next()
+  }
+}
+
 override def coGroup(left: lang.Iterable[(Int, Int, 
Array[Array[Double]])],
--- End diff --

Is this a Java iterable here? Any reason to use this instead of the Scala 
`Iterable` trait?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80862018
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ImplicitALSTest.scala
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest._
+
+import scala.language.postfixOps
+import org.apache.flink.api.scala._
+import org.apache.flink.core.testutils.CommonTestUtils
+
+class ImplicitALSTest
+  extends FlatSpec
+with Matchers
+with FlinkTestBase {
+
+  override val parallelism = 2
+
+  behavior of "The modification of the alternating least squares (ALS) 
implementation" +
+"for implicit feedback datasets."
+
+  it should "properly compute Y^T * Y, and factorize matrix" in {
--- End diff --

Are you testing two functionalities in this test? If yes I suggest to split 
them to two functional units.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860057
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -49,6 +49,21 @@ By applying this step alternately to the matrices $U$ 
and $V$, we can iterativel
 
 The matrix $R$ is given in its sparse representation as a tuple of $(i, j, 
r)$ where $i$ denotes the row index, $j$ the column index and $r$ is the matrix 
value at position $(i,j)$.
 
+An alternative model can be used for _implicit feedback_ datasets.
+These datasets only contain implicit feedback from the user
+in contrast to datasets with explicit feedback like movie ratings.
+For example users watch videos on a website and the website monitors which 
user
+viewed which video, so the users only provide their preference implicitly.
+In these cases the feedback should not be treated as a
+rating, but rather an evidence that the user prefers that item.
+Thus, for implicit feedback datasets there is a slightly different
+minimalization problem to solve (see [Hu et 
al.](http://dx.doi.org/10.1109/ICDM.2008.22) for details).
--- End diff --

Change "minimalization" to "optimization".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2542: [FLINK-4613] Extend ALS to handle implicit feedbac...

2016-09-28 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2542#discussion_r80860320
  
--- Diff: docs/dev/libs/ml/als.md ---
@@ -99,6 +114,26 @@ The alternating least squares implementation can be 
controlled by the following
 
   
   
+ImplicitPrefs
+
+  
+Implicit property of the observations, meaning that they do 
not represent an explicit
+preference of the user, just the implicit information how many 
times the user consumed the
--- End diff --

Missing word at the end "consumed the ???". Would also change "explicit 
preference" to "explicit rating from the user".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2542: [FLINK-4613] Extend ALS to handle implicit feedback datas...

2016-09-28 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2542
  
Hello @gaborhermann thank you for your contribution! 
Are the numbers here non-zero entries in a matrix?
If that is the case do you think it would be possible to test this on some 
larger scale datasets?

This would bring it closer to actual use cases someone using Flink might 
have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2393: [trivial] Fix typo in dosctring

2016-08-19 Thread thvasilo
GitHub user thvasilo opened a pull request:

https://github.com/apache/flink/pull/2393

[trivial] Fix typo in dosctring

There is a small typo in the getBufferTimeout docstring, this fixes it.

Talked with @tillrohrmann about whether it's worth it to open a PR for 
something like this, I guess it might be easier for a committer to gather a few 
of these and then commit all together.

Cheers,
Theo

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thvasilo/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2393


commit afd8ac756e8cb480bb96c6b5bb1c58e984730f9a
Author: Theodore Vasiloudis <t...@sics.se>
Date:   2016-08-19T22:00:36Z

Update StreamExecutionEnvironment.java




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and...

2016-05-17 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r63519957
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationPenaltyITSuite.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+
+class RegularizationPenaltyITSuite extends FlatSpec with Matchers with 
FlinkTestBase {
--- End diff --

@tillrohrmann Do tests like these that only test components without an 
`ExecutionEnvironment` being created get named differently (i.e. no ITSuite in 
the class name)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and...

2016-05-17 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r63519390
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/RegularizationPenalty.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.math.{Vector, BLAS}
+import org.apache.flink.ml.math.Breeze._
+import breeze.linalg.{norm => BreezeNorm}
+
+/** Represents a type of regularization penalty
+  *
+  * Regularization penalties are used to restrict the optimization problem 
to solutions with
+  * certain desirable characteristics, such as sparsity for the L1 
penalty, or penalizing large
+  * weights for the L2 penalty.
+  *
+  * The regularization term, `R(w)` is added to the objective function, 
`f(w) = L(w) + lambda*R(w)`
+  * where lambda is the regularization parameter used to tune the amount 
of regularization applied.
+  */
+trait RegularizationPenalty extends Serializable {
+
+  /** Calculates the new weights based on the gradient and regularization 
penalty
+*
+* @param weightVector The weights to be updated
+* @param gradient The gradient used to update the weights
+* @param regularizationConstant The regularization parameter to be 
applied 
+* @param learningRate The effective step size for this iteration
+* @return Updated weights
+*/
+  def takeStep(
+  weightVector: Vector,
+  gradient: Vector,
+  regularizationConstant: Double,
+  learningRate: Double)
+: Vector
+
+  /** Adds regularization to the loss value
+*
+* @param oldLoss The loss to be updated
+* @param weightVector The gradient used to update the loss
+* @param regularizationConstant The regularization parameter to be 
applied
+* @return Updated loss
+*/
+  def regLoss(oldLoss: Double, weightVector: Vector, 
regularizationConstant: Double): Double
+
+}
+
+
+/** `L_2` regularization penalty.
+  *
+  * The regularization function is the square of the L2 norm 
`1/2*||w||_2^2`
+  * with `w` being the weight vector. The function penalizes large weights,
+  * favoring solutions with more small weights rather than few large ones.
+  */
+object L2Regularization extends RegularizationPenalty {
+
+  /** Calculates the new weights based on the gradient and L2 
regularization penalty
+*
+* The updated weight is `w - learningRate *(gradient + lambda * w)` 
where
+* `w` is the weight vector, and `lambda` is the regularization 
parameter.
+*
+* @param weightVector The weights to be updated
+* @param gradient The gradient according to which we will update the 
weights
+* @param regularizationConstant The regularization parameter to be 
applied
+* @param learningRate The effective step size for this iteration
+* @return Updated weights
+*/
+  override def takeStep(
+  weightVector: Vector,
+  gradient: Vector,
+  regularizationConstant: Double,
+  learningRate: Double)
+: Vector = {
+// add the gradient of the L2 regularization
+BLAS.axpy(regularizationConstant, weightVector, gradient)
+
+// update the weights according to the learning rate
+BLAS.axpy(-learningRate, gradient, weightVector)
+
+weightVector
+  }
+
+  /** Adds regularization to the loss value
+*
+* The updated loss is `l + lambda * 1/2*||w||_2^2` where `l` is the 
old loss,
--- End diff --

I would recommend spelling out `loss` here, as in some fonts it's hard tell 
the letter "l" apart from the number 1,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes

[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and...

2016-05-17 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r63517610
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -272,7 +272,7 @@ abstract class GradientDescent extends IterativeSolver {
   * The regularization function is `1/2 ||w||_2^2` with `w` being the 
weight vector.
   */
 class GradientDescentL2 extends GradientDescent {
-
+  //TODO(skavulya): Pass regularization penalty as a parameter
--- End diff --

I've mentioned this in the previous PR but adding here for completeness:

I'm in favor of adding the regularization penalty as a parameter for the 
optimizer.

However that would involve changes that perhaps beyond the scope of this 
PR, 
currently with only SGD available we don't have to worry about the 
applicability of L1/L2 regularization, but should add a note for when L-BFGS 
get implemented. 

Depending on how much work @skavulya it would be to make the change here, 
we can choose to have a separate PR for that or include it here.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and...

2016-05-17 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r63517587
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -272,7 +272,7 @@ abstract class GradientDescent extends IterativeSolver {
   * The regularization function is `1/2 ||w||_2^2` with `w` being the 
weight vector.
   */
 class GradientDescentL2 extends GradientDescent {
-
+  //TODO(skavulya): Pass regularization penalty as a parameter
--- End diff --

I've mentioned this in the previous PR but adding here for completeness:

I'm in favor of adding the regularization penalty as a parameter for the 
optimizer.

However that would involve changes that perhaps beyond the scope of this 
PR, 
currently with only SGD available we don't have to worry about the 
applicability of L1/L2 regularization, but should add a note for when L-BFGS 
get implemented. 

Depending on how much work @skavulya it would be to make the change here, 
we can choose to have a separate PR for that or include it here.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Lossfunctions

2016-05-11 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/656#issuecomment-218399619
  
Hello @skavulya thank you for your contribution, I'm looking forward to the 
PR!

Re. the regularization penalty, we it changed to make user choice easier, 
e.g. preventing the use of an L1 penalty when L-BFGS is used.

Personally I would prefer to go back to the original design and have it as 
a settable parameter, and apply some sanity checking within the optimizer to 
avoid cases like the one mentioned above.

I would suggest you open the PR and we can discuss it there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests

2016-04-27 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1915#issuecomment-215201773
  
Any idea what might be causing the error in FlinkML? I can't pinpoint it to 
any specific source file. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] Create evaluation framework ...

2016-04-21 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1849#issuecomment-212977748
  
I did some testing and I think the problem has to do with the types that 
each scaler expects.

`StandardScaler` has fit and transform operations for `DataSets` of type 
`Vector`, `LabeledVector`, and `(T :< Vector, Double)` while `MinMaxScaler` 
does not provide one for `(T :< Vector, Double)`. If you add the operations the 
code runs fine (at least re. you first comment).

So this is a bug unrelated to this PR I think. The question becomes if we 
want to support all three of these types. My recommendation would be to have 
support for `Vector` and `LabeledVector` only, and remove all operations that 
work on `(Vector, Double)` tuples. I will file a JIRA for that.

There is an argument to be whether some pre-processing steps are supervised 
(e.g. [PCA vs. 
LDA](https://stats.stackexchange.com/questions/161362/supervised-dimensionality-reduction))
 but in the strict definition of a transformer we shouldn't care about the 
label, only the features, so that operation can implemented at the 
`Transformer` level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] Create evaluation framework ...

2016-04-21 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1849#issuecomment-212942014
  
Well breeze was recently bumped to 0.12 #1876, maybe that has something to 
do with it, but let's see.

Any chance you can try with the prev. Breeze version?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] Create evaluation framework ...

2016-04-21 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1849#issuecomment-212793430
  
Hello Trevor,

Thanks for taking the time to look at this, I'll investigate these issues
today hopefully.

-- 
Sent from a mobile device. May contain autocorrect errors.
On Apr 21, 2016 12:16 AM, "Trevor Grant" <notificati...@github.com> wrote:

> Also two quick issues.
>
> *pipelines*
>
> val scaler = MinMaxScaler()val pipeline = scaler.chainPredictor(mlr)val 
evaluationDS = survivalLV.map(x => (x.vector, x.label))
>
> pipeline.fit(survivalLV)
> scorer.evaluate(evaluationDS, pipeline).collect().head
>
> When using this with a ChainedPredictor as the predictor I get the
> following error:
> error: could not find implicit value for parameter evaluateOperation:
> 
org.apache.flink.ml.pipeline.EvaluateDataSetOperation[org.apache.flink.ml.pipeline.ChainedPredictor[org.apache.flink.ml.preprocessing.MinMaxScaler,org.apache.flink.ml.regression.MultipleLinearRegression],(org.apache.flink.ml.math.Vector,
> Double),Double]
>
> *MinMaxScaler()*
> Merging for me broke the following code:
>
> val scaler = MinMaxScaler()val scaledSurvivalLV = 
scaler.transform(survivalLV)
>
> With the following error (omiting part of the stack trace)
> Caused by: java.lang.NoSuchMethodError:
> breeze.linalg.Vector$.scalarOf()Lbreeze/linalg/support/ScalarOf;
> at
> 
org.apache.flink.ml.preprocessing.MinMaxScaler$$anonfun$3.apply(MinMaxScaler.scala:156)
> at
> 
org.apache.flink.ml.preprocessing.MinMaxScaler$$anonfun$3.apply(MinMaxScaler.scala:154)
> at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:584)
> at
> 
org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver.collect(ChainedAllReduceDriver.java:93)
> at
> 
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> I'm looking for a work around. Just saying I found a regression. Other
> than that, looks/works AWESOME well done.
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/flink/pull/1849#issuecomment-212633912>
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] Create evaluation framework ...

2016-04-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1849#issuecomment-205214498
  
@mbalassi @tillrohrmann 

Closed the previous PR and opened this one for the evaluation framework, as 
I had some issues with rebasing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] Create evaluation framework ...

2016-04-04 Thread thvasilo
Github user thvasilo closed the pull request at:

https://github.com/apache/flink/pull/871


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] Create evaluation framework ...

2016-04-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/871#issuecomment-205214045
  
Moving this to #1849 due to git eating my changes when rebasing under the 
new dir structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] Create evaluation framework ...

2016-04-04 Thread thvasilo
GitHub user thvasilo opened a pull request:

https://github.com/apache/flink/pull/1849

[FLINK-2157] [ml] Create evaluation framework for ML library

Using this PR instead of #871 due to rebase issues.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thvasilo/flink evaluation-rebase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1849.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1849


commit 8ce81d55cfffb70c8971fae47ea315990a7ea97f
Author: Theodore Vasiloudis <t...@sics.se>
Date:   2016-04-04T09:36:26Z

[FLINK-2157] [ml] Create evaluation framework for ML library




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2157] [ml] Create evaluation framework ...

2016-03-29 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/871#issuecomment-203075630
  
@rawkintrevo  AFAIK it's lack of time from a commiter to review it. If 
@tillrohrmann can find some time to review this I'll refactor it to get rid of 
the conflicts and hopefully we can merge this and move on to #891 and #902 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3464] [docs] Add SBT template documenta...

2016-02-24 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1688#issuecomment-188278485
  
In terms of usability I think it is.The whole point of the quickstart guide 
is to get users up and running as quickly as possible. For their first toy 
project people probably will not care about the customization that giter8 
offers.

Most importantly adding the requirement to go to some website that asks you 
to curl a script and pipe it to bash (which is something this commit actually 
fixes on the Flink side) to install *another* tool (Conscript) that will let 
you install the original tool is too much effort for a *quick*start guide.

So I vote for the `git clone` default, with giter8 as an option.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3464] [docs] Add SBT template documenta...

2016-02-24 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1688#issuecomment-188258580
  
Thanks for this and fixing the SBT build, this should greatly help setting 
up new users in demonstrations etc.

@tillrohrmann Any reason giter8 was selected as the default tab? Simply 
cloning the repo without asking the users to install some tool (I've never seen 
before) makes more sense doesn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3330] [ml] Fix SparseVector support in ...

2016-02-04 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/1587#discussion_r51909968
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
 ---
@@ -27,6 +27,21 @@ object RegressionData {
   val expectedWeight0: Double = 9.8158
   val expectedSquaredResidualSum: Double = 49.7596/2
 
+  val sparseData: Seq[LabeledVector] = Seq(
+new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 
1.0, 1.0))),
+new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), 
Array(1.0, 1.0, 1.0, 1.0))),
+new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 
1.0))),
+new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),
+new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 
1.0))),
+new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0
+
+  val expectedWeightsSparseInput = Array(0.5448906338353784, 
0.15718880164669916,
+   0.034001300318125725, 
0.38770183218867915, 0.0,
+   0.15718880164669916, 0.0, 0.0, 
0.0, 0.15718880164669916)
--- End diff --

Indentation seems a bit off here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3330] [ml] Fix SparseVector support in ...

2016-02-04 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/1587#discussion_r51910120
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -192,10 +190,18 @@ abstract class GradientDescent extends 
IterativeSolver {
   (left, right) =>
 val (leftGradVector, leftCount) = left
 val (rightGradVector, rightCount) = right
-// Add the left gradient to the right one
-BLAS.axpy(1.0, leftGradVector.weights, rightGradVector.weights)
+
+// make the left gradient dense so that the following reduce 
operations (left fold) reuse
+// it. This strongly depends on the underlying implementation of 
the ReduceDriver
--- End diff --

Hey @tillrohrmann could you explain what you mean by "strongly depends on 
the underlying implementation of the ReduceDriver"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2003] [docs] Building on some encrypted...

2015-09-07 Thread thvasilo
GitHub user thvasilo opened a pull request:

https://github.com/apache/flink/pull/1100

[FLINK-2003] [docs] Building on some encrypted filesystems leads to "File 
name too long" error

Replaces #690, adding docs instead of changing the pom files.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thvasilo/flink encfs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1100.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1100


commit 0e1d83a58047a4af5a991e2c30bc8fc1398139fb
Author: Theodore Vasiloudis <t...@sics.se>
Date:   2015-09-07T14:25:58Z

Added instructions for encrypted filesystems




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2030][FLINK-2274][core][utils]Histogram...

2015-09-05 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-137936053
  
I agree, let's not break the API in this PR. We can create an issue and 
have a small discussion on the list about the change, and if the community 
agrees we can break the API in another PR specifically for that issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2030][FLINK-2274][core][utils]Histogram...

2015-09-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-137714631
  
Hello, my 2c: This PR should include docs and a wrapper for the Scala API. 
We can also do this with a separate issue but it would be best if we merge as a 
more complete package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2030][FLINK-2274][core][utils]Histogram...

2015-09-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-137768373
  
Re 1: Good change, implicit classes is the way to go here, no need for 
implicit conversion in the object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2030][FLINK-2274][core][utils]Histogram...

2015-09-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-137772917
  
I think it would be hard to do otherwise. `java.lang.Double`'s are objects 
but `scala.Double`'s are not from what I understand. In this case duplicating 
the code might not be so bad.
Perhaps @tillrohrmann has a better idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2030][FLINK-2274][core][utils]Histogram...

2015-09-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-137764885
  
```scala

  def createDiscreteHistogram: DataSet[DiscreteHistogram] = {
wrap(jutils.DataSetUtils.createDiscreteHistogram(
  self.map(x => new java.lang.Double(x)).javaSet))
  }

```

Would something like this work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2030][FLINK-2274][core][utils]Histogram...

2015-09-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-137775231
  
Also @tillrohrmann : Is changing the DataSetUtils structure considered an 
API breaking change and should we handle it differently?

Any code using `import 
org.apache.flink.api.scala.DataSetUtils.utilsToDataSet` must now be changed to 
use `import org.apache.flink.api.scala.DataSetUtils._`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2030][FLINK-2274][core][utils]Histogram...

2015-09-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-137759354
  
@sachingoel0101 That sounds like a good place for the docs, that 
`zip_elements_guide.mb` is linked from the DataSet Transformations doc, so 
replacing that link to a more general "DataSet Utilities" doc would make sense.

I'm looking into the wrapper for Scala, it's a bit more convoluted than I 
thought because of scala.Double being a Java primitive double so the conversion 
is not straightforward.





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2030][FLINK-2274][core][utils]Histogram...

2015-09-04 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-13090
  
If it is actually considered to be API breaking then we should open a new 
PR and JIRA to make sure the change is well documented. Let's see what Till 
thinks and I can create the JIRA and PR if needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-22 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-133675023
  
It's great to have this in, I'll try to update the cross-validation and SGD 
to use this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-19 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-132552269
  
Sounds good, just make sure to update the title and description of both PRs 
to reflect the current state 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-18 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-132146208
  
Sounds good. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-18 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-132261492
  
Sorry to be a nitpick, but if we are going to split the PR then the 
documentation should be split accordingly, we can merge the column-wise 
statistics once this one is merged. I know this is taking very long to merge, 
but it is better if we do things properly and not rush them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-14 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r37054584
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List = JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+verifySamplerWithFractionWithoutReplacement(0d)
+verifySamplerWithFractionWithoutReplacement(0.2d)
+verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+verifySamplerWithFractionWithReplacement(0d)
+verifySamplerWithFractionWithReplacement(0.2d)
+verifySamplerWithFractionWithReplacement(1.0d)
+verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+verifySamplerWithFixedSizeWithoutReplacement(0)
+verifySamplerWithFixedSizeWithoutReplacement(2)
+verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+verifySamplerWithFixedSizeWithReplacement(0)
+verifySamplerWithFixedSizeWithReplacement(2)
+verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double) {
+verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double, seed: Long) {
+verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, 
seed: Long) {
+verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, 
fraction: Double, seed: Long) {
+val ds = getSourceDataSet()
+val sampled = ds.sample(withReplacement, fraction, seed)
+result = sampled.collect.asJava
--- End diff --

Thanks, I missed that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...

2015-08-14 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/696#issuecomment-131107449
  
+1 for closing this and focusing on approximate kNN instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36955515
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
+ *
+ * @param T The type of sample.
+ */
+public class BernoulliSamplerT extends RandomSamplerT {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
--- End diff --

*B*ernouli should be capitalized for all mentions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36960527
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
@@ -451,6 +454,53 @@ protected static File asFile(String path) {
assertEquals(extectedStrings[i], resultStrings[i]);
}
}
+   
+   // 

+   // Comparison methods for tests using sample
+   // 

+   
+   public static T void containsResultAsTuples(ListT result, String 
expected) {
+   isExpectedContainsResult(result, expected, true);
+   }
+   
+   public static T void containsResultAsText(ListT result, String 
expected) {
+   isExpectedContainsResult(result, expected, false);
+   }
+   
+   private static T void isExpectedContainsResult(ListT result, String 
expected, boolean asTuple) {
--- End diff --

Can we get comments explaining the functionality of this and 
`containsResultAsText`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36960080
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
@@ -451,6 +454,53 @@ protected static File asFile(String path) {
assertEquals(extectedStrings[i], resultStrings[i]);
}
}
+   
+   // 

+   // Comparison methods for tests using sample
+   // 

+   
+   public static T void containsResultAsTuples(ListT result, String 
expected) {
--- End diff --

Is this used anywhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-08-13 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36958345
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
+ *
+ * @param T The type of sample.
+ */
+public class BernoulliSamplerT extends RandomSamplerT {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+*/
+   public BernoulliSampler(double fraction) {
+   this(fraction, new Random());
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator seed.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+* @param seed Random number generator seed.
+*/
+   public BernoulliSampler(double fraction, long seed) {
+   this(fraction, new Random(seed));
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+* @param random   The random number generator.
+*/
+   public BernoulliSampler(double fraction, Random random) {
+   Preconditions.checkArgument(fraction = 0  fraction = 1.0d, 
fraction fraction must between [0, 1].);
+   this.fraction = fraction;
+   this.random = random;
+   }
+   
+   /**
+* Sample the input elements, for each input element, take a Bernoulli 
Trail for sample.
+*
+* @param input Elements to be sampled.
+* @return The sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T current;
+   
+   @Override
+   public boolean hasNext() {
+   if (current == null) {
+   while (input.hasNext()) {
+   T element = input.next();
+   if (random.nextDouble() = 
fraction) {
+   current = element;
+   return true;
+   }
+   }
+   current = null;
+   return false;
+   } else {
+   return true;
+   }
+   }
+   
+   @Override
+   public T next() {
--- End diff --

It feels a bit counterintuitive that the next element is prepared in the 
`hasNext()` function. Doesn't this mean that `hasNext()` **needs** to be called 
every time before we call `next()`?

Can we protect against that case where we would get a `null` element back 
that way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature

  1   2   3   4   >