[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-06-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2761


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r124667752
  
--- Diff: 
flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
 ---
@@ -0,0 +1,136 @@
+package org.apache.flink.streaming.scala.examples
--- End diff --

AL2 header missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r124658721
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 ---
@@ -126,4 +94,40 @@ public void apply(Long key, Window window, 
Iterable> values,
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
}
+
+   /**
+* Parallel data source that serves a list of key-value pair.
--- End diff --

+s -> `... list of key-value pairs.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r124658613
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 ---
@@ -34,49 +34,17 @@
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 /**
- * Example of grouped processing time windows.
+ * An example of grouped stream windowing into sliding time windows.
+ * This example uses [[RichParallelSourceFunction]] to generate a list of 
key-value pair.
--- End diff --

+s -> `... list of key-value pairs.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r124667797
  
--- Diff: 
flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
 ---
@@ -0,0 +1,135 @@
+package org.apache.flink.streaming.test;
--- End diff --

AL2 header missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r124658452
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 ---
@@ -70,9 +70,7 @@ public void run(SourceContext> ctx) throws Excepti
for (Tuple3 value : input) {

ctx.collectWithTimestamp(value, value.f1);
ctx.emitWatermark(new 
Watermark(value.f1 - 1));
-   if (!fileOutput) {
-   
System.out.println("Collected: " + value);
-   }
+   
System.out.println("Collected: " + value);
--- End diff --

remove the `println` as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97205993
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
--- End diff --

Scaladocs are commented with 
```
/**
 *
 */
```
instead of 
```
/**
  *
  */
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97205961
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
--- End diff --

The use of HTML is discouraged in Scaladoc. Instead use wiki markup 
(markdown) whenever possible.
See [Scaladoc guidelines](http://docs.scala-lang.org/style/scaladoc.html).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97658391
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * An example of grouped stream windowing into sliding time windows.
+  * This example uses [[RichParallelSourceFunction]] to generate a list of 
key-value pair.
+  */
+object GroupedProcessingTimeWindowExample {
+
+  def main(args: Array[String]): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(4)
+
+val stream = env.addSource(new RichParallelSourceFunction[(Long, 
Long)]() {
--- End diff --

Move the source function to a separate class? It "hides" the important 
aspects of the example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97657755
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
+  (_: Array[Double]) => 1
+)
+
+// emit result
+if (params.has("output")) {
+  prediction.writeAsText(params.get("output"))
+} else {
+  println("Printing result to stdout. Use --output to specify output 
path.")
+  prediction.print()
+}
+
+// execute program
+env.execute("Streaming Incremental Learning")
+  }
+
+  // 
*
+  // USER FUNCTIONS
+  // 
*
+
+  /**
+* Feeds new data for newData. By default it is implemented as 
constantly
+* emitting the Integer 1 in a loop.
+*/
+  private class FiniteNewDataSource extends SourceFunction[Int] {
+var counter: Int = 0
+
+override def run(ctx: SourceContext[Int]) = {
+  Thread.sleep(15)
+  while (counter < 50) {
+ctx.collect(getNewData)
+  }
+}
+
+def getNewData = {
+  

[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206069
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
--- End diff --

I find it helpful to add the type of the DataStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207859
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
+  // selecting English tweets and splitting to (word, 1)
+  .flatMap { value: String =>
+  val jsonParser = new ObjectMapper()
--- End diff --

Creating a new `ObjectMapper()` in each function call is quite expensive


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206812
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
--- End diff --

please verify Scaladocs style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207774
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
--- End diff --

add DataStream type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207663
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
--- End diff --

Scaladocs style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97654855
  
--- Diff: 
flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/iteration/IterateExampleITCase.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.iteration;
+
+import org.apache.flink.streaming.scala.examples.iteration.IterateExample;
+import 
org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IterateExampleITCase extends StreamingProgramTestBase {
--- End diff --

ITCases extending `StreamingProgramTestBase` are very expensive because 
they internally start a Flink minicluster which takes a significant amount of 
time, usually much more than the actual test.
The class `StreamingMultipleProgramsTestBase` allows to reuse the 
minicluster across several tests.
I would suggest to port all existing example tests (Java and Scala) into a 
single ITCase which extends `StreamingMultipleProgramsTestBase`. This should 
reduce Flink's build time.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207844
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
+  // selecting English tweets and splitting to (word, 1)
+  .flatMap { value: String =>
--- End diff --

I think this complex function should be moved into a separate class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97657374
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
+  (_: Array[Double]) => 1
+)
+
+// emit result
+if (params.has("output")) {
+  prediction.writeAsText(params.get("output"))
+} else {
+  println("Printing result to stdout. Use --output to specify output 
path.")
+  prediction.print()
+}
+
+// execute program
+env.execute("Streaming Incremental Learning")
+  }
+
+  // 
*
+  // USER FUNCTIONS
+  // 
*
+
+  /**
+* Feeds new data for newData. By default it is implemented as 
constantly
+* emitting the Integer 1 in a loop.
+*/
+  private class FiniteNewDataSource extends SourceFunction[Int] {
+var counter: Int = 0
+
+override def run(ctx: SourceContext[Int]) = {
+  Thread.sleep(15)
--- End diff --

can be simplified to 
```
Thread.sleep(15)
(0 until 50).foreach{ _ =>
  Thread.sleep(5)
  

[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97651600
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.wordcount
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.examples.java.wordcount.util.WordCountData
+import org.apache.flink.streaming.api.scala._
+
+/**
+  * Implements the "WordCount" program that computes a simple word 
occurrence
--- End diff --

Scaladocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206987
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
--- End diff --

please add type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207217
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
--- End diff --

I agree with @thvasilo. We should copy the code of the Java job. 

Otherwise, this example just demonstrates how to use `connect()` and 
`CoMapFunction`. 
For that we would not need custom sources and window aggregation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206373
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
+if (params.has("input")) {
+  env.readTextFile(params.get("input")).map { value: String =>
+val record = value.substring(1, value.length - 1)
+val splitted = record.split(",")
+(Integer.parseInt(splitted(0)), Integer.parseInt(splitted(1)))
+  }
+} else {
+  println("Executing Iterate example with default input data set.")
+  println("Use --input to specify file input.")
+  env.addSource(new RandomFibonacciSource)
+}
+
+def withinBound(value: (Int, Int)) = value._1 < Bound && value._2 < 
Bound
+
+// create an iterative data stream from the input with 5 second timeout
+val numbers = inputStream
+  // Map the inputs so that the next Fibonacci numbers can be 
calculated
+  // while preserving the original input tuple
+  // A counter is attached to the tuple and incremented in every 
iteration step
+  .map(value => (value._1, value._2, value._1, value._2, 0))
+  .iterate(
+(iteration: DataStream[(Int, Int, Int, Int, Int)]) => {
+  // calculates the next Fibonacci number and increment the counter
+  val step = iteration.map(value =>
+(value._1, value._2, value._4, value._3 + value._4, value._5 + 
1))
+  // testing which tuple needs to be iterated again
+  val feedback = step.filter(value => withinBound(value._3, 
value._4))
+  // get the input pairs that have the greatest iteration counter
--- End diff --

Please check this comment. I do not see a sliding window or how the 
greatest iteration counter is identified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97658665
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * An example of grouped stream windowing in session windows with session 
timeout of 3 msec.
+  * A source fetches elements with key, timestamp, and count.
+  */
+object SessionWindowing {
+
+  def main(args: Array[String]): Unit = {
+
+val params = ParameterTool.fromArgs(args)
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+env.getConfig.setGlobalJobParameters(params)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+
+val fileOutput = params.has("output")
+
+val input = List(
+  ("a", 1L, 1),
+  ("b", 1L, 1),
+  ("b", 3L, 1),
+  ("b", 5L, 1),
+  ("c", 6L, 1),
+  // We expect to detect the session "a" earlier than this point (the 
old
+  // functionality can only detect here when the next starts)
+  ("a", 10L, 1),
+  // We expect to detect session "b" and "c" at this point as well
+  ("c", 11L, 1)
+)
+
+val source = env.addSource(new SourceFunction[(String, Long, Int)]() {
+
+  override def run(ctx: SourceContext[(String, Long, Int)]): Unit = {
+input.foreach(value => {
+  ctx.collectWithTimestamp(value, value._2)
+  ctx.emitWatermark(new Watermark(value._2 - 1))
+  if (!fileOutput) {
--- End diff --

I'd remove this condition. Not sure how much value the printed values add 
to the example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206996
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
--- End diff --

please add type to DataStreams


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206112
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
+if (params.has("input")) {
+  env.readTextFile(params.get("input")).map { value: String =>
--- End diff --

Please add a comment describing what the map function does


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---