[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069671#comment-16069671 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2761 No worries :-) > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.4.0 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069632#comment-16069632 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 @fhueske Sorry for the problems you have raised, I can see the painful now, I will avoid the same mistake. Thanks for your time I really appreciate it. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.4.0 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069059#comment-16069059 ] ASF GitHub Bot commented on FLINK-3551: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2761 > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068148#comment-16068148 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124667752 --- Diff: flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala --- @@ -0,0 +1,136 @@ +package org.apache.flink.streaming.scala.examples --- End diff -- AL2 header missing > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068150#comment-16068150 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124667797 --- Diff: flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java --- @@ -0,0 +1,135 @@ +package org.apache.flink.streaming.test; --- End diff -- AL2 header missing > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068152#comment-16068152 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124658613 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java --- @@ -34,49 +34,17 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; /** - * Example of grouped processing time windows. + * An example of grouped stream windowing into sliding time windows. + * This example uses [[RichParallelSourceFunction]] to generate a list of key-value pair. --- End diff -- +s -> `... list of key-value pairs.` > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068151#comment-16068151 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124658452 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java --- @@ -70,9 +70,7 @@ public void run(SourceContext> ctx) throws Excepti for (Tuple3 value : input) { ctx.collectWithTimestamp(value, value.f1); ctx.emitWatermark(new Watermark(value.f1 - 1)); - if (!fileOutput) { - System.out.println("Collected: " + value); - } + System.out.println("Collected: " + value); --- End diff -- remove the `println` as well > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068149#comment-16068149 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124658721 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java --- @@ -126,4 +94,40 @@ public void apply(Long key, Window window, Iterable> values, return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } } + + /** +* Parallel data source that serves a list of key-value pair. --- End diff -- +s -> `... list of key-value pairs.` > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16067314#comment-16067314 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2761 Thanks for the update @ch33hau. I made a quick pass over the PR and it looks quite good. Will have a more detailed look in the next days and probably merge it. Thanks for porting the examples and refactoring the tests! > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16052711#comment-16052711 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 Hi @fhueske this is ready for review. I have moved Java's IT cases to `org.apache.flink.streaming.test.StreamingExamplesITCase` and Scala's IT cases to `org.apache.flink.streaming.scala.examples.StreamingExamplesITCase`. The reason I spitted Java and Scala into two files is because : - some Scala tests I couldn't test in Java code, eg, `testWindowJoin`. - Group both Java and Scala IT cases into a single file is confusing, especially the package names and class name. Thanks > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16045626#comment-16045626 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 @fhueske very sorry about this decade-long delay... I was working on some relocation stuff for the past few months. I have rebased and push a commit for fixing the PR comments: - Remove html in scaladoc - Add type for DataStream - All the comments you have posted Except moving all Java and Scala IT cases to a single `StreamingMultipleProgramsTestBase`, I'm working on this right now. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837415#comment-15837415 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2761 @ch33hau, sure! Just drop a comment when you updated the PR (a simple push does not trigger a notification but a comment does). Thanks, Fabian > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836734#comment-15836734 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 @fhueske it is ok, and thanks for the review =) I couldn't fix them now and I might start working on your comments in 1 day. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836689#comment-15836689 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206069 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. + * The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. + * + * + * This example shows how to use: + * + * streaming iterations, + * buffer timeout to enhance latency, + * directed outputs. + * + * + */ +object IterateExample { + + private val Bound = 100 + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// obtain execution environment and set setBufferTimeout to 1 to enable +// continuous flushing of the output buffers (lowest latency) +val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1) + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create input stream of integer pairs +val inputStream = --- End diff -- I find it helpful to add the type of the DataStream. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836698#comment-15836698 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206112 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. + * The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. + * + * + * This example shows how to use: + * + * streaming iterations, + * buffer timeout to enhance latency, + * directed outputs. + * + * + */ +object IterateExample { + + private val Bound = 100 + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// obtain execution environment and set setBufferTimeout to 1 to enable +// continuous flushing of the output buffers (lowest latency) +val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1) + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create input stream of integer pairs +val inputStream = +if (params.has("input")) { + env.readTextFile(params.get("input")).map { value: String => --- End diff -- Please add a comment describing what the map function does > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836701#comment-15836701 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97657374 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData + .assignTimestampsAndWatermarks(new LinearTimestamp) + .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) + .apply(new PartialModelBuilder) + +// use partial model for newData +val prediction = newData.connect(model).map( + (_: Int) => 0, + (_: Array[Double]) => 1 +) + +// emit result +if (params.has("output")) { + prediction.writeAsText(params.get("output")) +} else { + println("Printing result to stdout. Use --output to specify output path.") + prediction.print() +} + +// execute program +env.execute("Streaming Incremental Learning") + } + + // * + // USER FUNCTIONS + // * + + /** +* Feeds new data for newData. By default it is implemented as constantly +* emitting the Integer 1 in a loop. +*/ + private class FiniteNewDataSource extends SourceFunction[Int] { +var counter: Int = 0
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836684#comment-15836684 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97205993 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. --- End diff -- Scaladocs are commented with ``` /** * */ ``` instead of ``` /** * */ ``` > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836686#comment-15836686 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207663 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.twitter + +import java.util.StringTokenizer + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.connectors.twitter.TwitterSource +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData +import org.codehaus.jackson.JsonNode +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.mutable.ListBuffer + +/** + * Implements the "TwitterStream" program that computes a most used word --- End diff -- Scaladocs style > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836692#comment-15836692 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206987 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData --- End diff -- please add type > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836683#comment-15836683 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97205961 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. + * The program sums up random numbers and counts additions --- End diff -- The use of HTML is discouraged in Scaladoc. Instead use wiki markup (markdown) whenever possible. See [Scaladoc guidelines](http://docs.scala-lang.org/style/scaladoc.html). > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836693#comment-15836693 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207859 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.twitter + +import java.util.StringTokenizer + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.connectors.twitter.TwitterSource +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData +import org.codehaus.jackson.JsonNode +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.mutable.ListBuffer + +/** + * Implements the "TwitterStream" program that computes a most used word + * occurrence over JSON objects in a streaming fashion. + * + * The input is a Tweet stream from a TwitterSource. + * + * + * Usage: Usage: TwitterExample [--output ] + * [--twitter-source.consumerKey + * --twitter-source.consumerSecret + * --twitter-source.token + * --twitter-source.tokenSecret ] + * + * + * If no parameters are provided, the program is run with default data from + * {@link TwitterExampleData}. + * + * + * This example shows how to: + * + * acquire external data, + * use in-line defined functions, + * handle flattened stream inputs. + * + */ +object TwitterExample { + + def main(args: Array[String]): Unit = { + +// Checking input parameters +val params = ParameterTool.fromArgs(args) +println("Usage: TwitterExample [--output ] " + + "[--twitter-source.consumerKey " + + "--twitter-source.consumerSecret " + + "--twitter-source.token " + + "--twitter-source.tokenSecret ]") + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +env.setParallelism(params.getInt("parallelism", 1)) + +// get input data +val streamSource = +if (params.has(TwitterSource.CONSUMER_KEY) && + params.has(TwitterSource.CONSUMER_SECRET) && + params.has(TwitterSource.TOKEN) && + params.has(TwitterSource.TOKEN_SECRET) +) { + env.addSource(new TwitterSource(params.getProperties)) +} else { + print("Executing TwitterStream example with default props.") + print("Use --twitter-source.consumerKey --twitter-source.consumerSecret " + +"--twitter-source.token " + +"--twitter-source.tokenSecret specify the authentication info." + ) + // get default test text data + env.fromElements(TwitterExampleData.TEXTS: _*) +} + +val tweets = streamSource + // selecting English tweets and splitting to (word, 1) + .flatMap { value: String => + val jsonParser = new ObjectMapper() --- End diff -- Creating a new `ObjectMapper()` in each function call is quite expensive > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836695#comment-15836695 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207844 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.twitter + +import java.util.StringTokenizer + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.connectors.twitter.TwitterSource +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData +import org.codehaus.jackson.JsonNode +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.mutable.ListBuffer + +/** + * Implements the "TwitterStream" program that computes a most used word + * occurrence over JSON objects in a streaming fashion. + * + * The input is a Tweet stream from a TwitterSource. + * + * + * Usage: Usage: TwitterExample [--output ] + * [--twitter-source.consumerKey + * --twitter-source.consumerSecret + * --twitter-source.token + * --twitter-source.tokenSecret ] + * + * + * If no parameters are provided, the program is run with default data from + * {@link TwitterExampleData}. + * + * + * This example shows how to: + * + * acquire external data, + * use in-line defined functions, + * handle flattened stream inputs. + * + */ +object TwitterExample { + + def main(args: Array[String]): Unit = { + +// Checking input parameters +val params = ParameterTool.fromArgs(args) +println("Usage: TwitterExample [--output ] " + + "[--twitter-source.consumerKey " + + "--twitter-source.consumerSecret " + + "--twitter-source.token " + + "--twitter-source.tokenSecret ]") + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +env.setParallelism(params.getInt("parallelism", 1)) + +// get input data +val streamSource = +if (params.has(TwitterSource.CONSUMER_KEY) && + params.has(TwitterSource.CONSUMER_SECRET) && + params.has(TwitterSource.TOKEN) && + params.has(TwitterSource.TOKEN_SECRET) +) { + env.addSource(new TwitterSource(params.getProperties)) +} else { + print("Executing TwitterStream example with default props.") + print("Use --twitter-source.consumerKey --twitter-source.consumerSecret " + +"--twitter-source.token " + +"--twitter-source.tokenSecret specify the authentication info." + ) + // get default test text data + env.fromElements(TwitterExampleData.TEXTS: _*) +} + +val tweets = streamSource + // selecting English tweets and splitting to (word, 1) + .flatMap { value: String => --- End diff -- I think this complex function should be moved into a separate class. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836690#comment-15836690 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97654855 --- Diff: flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/iteration/IterateExampleITCase.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test.exampleScalaPrograms.iteration; + +import org.apache.flink.streaming.scala.examples.iteration.IterateExample; +import org.apache.flink.streaming.examples.iteration.util.IterateExampleData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class IterateExampleITCase extends StreamingProgramTestBase { --- End diff -- ITCases extending `StreamingProgramTestBase` are very expensive because they internally start a Flink minicluster which takes a significant amount of time, usually much more than the actual test. The class `StreamingMultipleProgramsTestBase` allows to reuse the minicluster across several tests. I would suggest to port all existing example tests (Java and Scala) into a single ITCase which extends `StreamingMultipleProgramsTestBase`. This should reduce Flink's build time. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836699#comment-15836699 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207217 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData + .assignTimestampsAndWatermarks(new LinearTimestamp) + .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) + .apply(new PartialModelBuilder) + +// use partial model for newData +val prediction = newData.connect(model).map( + (_: Int) => 0, --- End diff -- I agree with @thvasilo. We should copy the code of the Java job. Otherwise, this example just demonstrates how to use `connect()` and `CoMapFunction`. For that we would not need custom sources and window aggregation. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836696#comment-15836696 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97650398 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.examples.java.wordcount.util.WordCountData +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment + +/** + * Implements a windowed version of the streaming "WordCount" program. --- End diff -- Scaladocs > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836702#comment-15836702 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97651600 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.wordcount + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.examples.java.wordcount.util.WordCountData +import org.apache.flink.streaming.api.scala._ + +/** + * Implements the "WordCount" program that computes a simple word occurrence --- End diff -- Scaladocs > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836694#comment-15836694 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97658665 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows +import org.apache.flink.streaming.api.windowing.time.Time + +/** + * An example of grouped stream windowing in session windows with session timeout of 3 msec. + * A source fetches elements with key, timestamp, and count. + */ +object SessionWindowing { + + def main(args: Array[String]): Unit = { + +val params = ParameterTool.fromArgs(args) +val env = StreamExecutionEnvironment.getExecutionEnvironment + +env.getConfig.setGlobalJobParameters(params) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setParallelism(1) + +val fileOutput = params.has("output") + +val input = List( + ("a", 1L, 1), + ("b", 1L, 1), + ("b", 3L, 1), + ("b", 5L, 1), + ("c", 6L, 1), + // We expect to detect the session "a" earlier than this point (the old + // functionality can only detect here when the next starts) + ("a", 10L, 1), + // We expect to detect session "b" and "c" at this point as well + ("c", 11L, 1) +) + +val source = env.addSource(new SourceFunction[(String, Long, Int)]() { + + override def run(ctx: SourceContext[(String, Long, Int)]): Unit = { +input.foreach(value => { + ctx.collectWithTimestamp(value, value._2) + ctx.emitWatermark(new Watermark(value._2 - 1)) + if (!fileOutput) { --- End diff -- I'd remove this condition. Not sure how much value the printed values add to the example. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836691#comment-15836691 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97658391 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import java.util.concurrent.TimeUnit.MILLISECONDS + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.windowing.time.Time + +/** + * An example of grouped stream windowing into sliding time windows. + * This example uses [[RichParallelSourceFunction]] to generate a list of key-value pair. + */ +object GroupedProcessingTimeWindowExample { + + def main(args: Array[String]): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(4) + +val stream = env.addSource(new RichParallelSourceFunction[(Long, Long)]() { --- End diff -- Move the source function to a separate class? It "hides" the important aspects of the example. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836685#comment-15836685 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206812 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a --- End diff -- please verify Scaladocs style. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836700#comment-15836700 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206373 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. + * The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. + * + * + * This example shows how to use: + * + * streaming iterations, + * buffer timeout to enhance latency, + * directed outputs. + * + * + */ +object IterateExample { + + private val Bound = 100 + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// obtain execution environment and set setBufferTimeout to 1 to enable +// continuous flushing of the output buffers (lowest latency) +val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1) + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create input stream of integer pairs +val inputStream = +if (params.has("input")) { + env.readTextFile(params.get("input")).map { value: String => +val record = value.substring(1, value.length - 1) +val splitted = record.split(",") +(Integer.parseInt(splitted(0)), Integer.parseInt(splitted(1))) + } +} else { + println("Executing Iterate example with default input data set.") + println("Use --input to specify file input.") + env.addSource(new RandomFibonacciSource) +} + +def withinBound(value: (Int, Int)) = value._1 < Bound && value._2 < Bound + +// create an iterative data stream from the input with 5 second timeout +val numbers = inputStream + // Map the inputs so that the next Fibonacci numbers can be calculated + // while preserving the original input tuple + // A counter is attached to the tuple and incremented in every iteration step + .map(value => (value._1, value._2, value._1, value._2, 0)) + .iterate( +(iteration: DataStream[(Int, Int, Int, Int, Int)]) => { + // calculates the next Fibonacci number and increment the counter + val step = iteration.map(value => +(value._1, value._2, value._4, value._3 + value._4, value._5 + 1)) + // testing which tuple needs to be iterated again + val feedback = step.filter(value => withinBound(value._3, value._4)) + // get the input pairs that have the greatest iteration counter --- End diff -- Please check this comment. I do not see a sliding window or how the greatest iteration counter is identified. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836688#comment-15836688 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207774 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.twitter + +import java.util.StringTokenizer + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.connectors.twitter.TwitterSource +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData +import org.codehaus.jackson.JsonNode +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.mutable.ListBuffer + +/** + * Implements the "TwitterStream" program that computes a most used word + * occurrence over JSON objects in a streaming fashion. + * + * The input is a Tweet stream from a TwitterSource. + * + * + * Usage: Usage: TwitterExample [--output ] + * [--twitter-source.consumerKey + * --twitter-source.consumerSecret + * --twitter-source.token + * --twitter-source.tokenSecret ] + * + * + * If no parameters are provided, the program is run with default data from + * {@link TwitterExampleData}. + * + * + * This example shows how to: + * + * acquire external data, + * use in-line defined functions, + * handle flattened stream inputs. + * + */ +object TwitterExample { + + def main(args: Array[String]): Unit = { + +// Checking input parameters +val params = ParameterTool.fromArgs(args) +println("Usage: TwitterExample [--output ] " + + "[--twitter-source.consumerKey " + + "--twitter-source.consumerSecret " + + "--twitter-source.token " + + "--twitter-source.tokenSecret ]") + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +env.setParallelism(params.getInt("parallelism", 1)) + +// get input data +val streamSource = +if (params.has(TwitterSource.CONSUMER_KEY) && + params.has(TwitterSource.CONSUMER_SECRET) && + params.has(TwitterSource.TOKEN) && + params.has(TwitterSource.TOKEN_SECRET) +) { + env.addSource(new TwitterSource(params.getProperties)) +} else { + print("Executing TwitterStream example with default props.") + print("Use --twitter-source.consumerKey --twitter-source.consumerSecret " + +"--twitter-source.token " + +"--twitter-source.tokenSecret specify the authentication info." + ) + // get default test text data + env.fromElements(TwitterExampleData.TEXTS: _*) +} + +val tweets = streamSource --- End diff -- add DataStream type > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836697#comment-15836697 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206996 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) --- End diff -- please add type to DataStreams > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836687#comment-15836687 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97657755 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData + .assignTimestampsAndWatermarks(new LinearTimestamp) + .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) + .apply(new PartialModelBuilder) + +// use partial model for newData +val prediction = newData.connect(model).map( + (_: Int) => 0, + (_: Array[Double]) => 1 +) + +// emit result +if (params.has("output")) { + prediction.writeAsText(params.get("output")) +} else { + println("Printing result to stdout. Use --output to specify output path.") + prediction.print() +} + +// execute program +env.execute("Streaming Incremental Learning") + } + + // * + // USER FUNCTIONS + // * + + /** +* Feeds new data for newData. By default it is implemented as constantly +* emitting the Integer 1 in a loop. +*/ + private class FiniteNewDataSource extends SourceFunction[Int] { +var counter: Int = 0
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15832688#comment-15832688 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2761 Hi @ch33hau, I'm sorry! I haven't had a look at it yet. I'll do that in the next days. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.2.0 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15831758#comment-15831758 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 Hi @fhueske , any update on this? =) > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.2.0 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15755247#comment-15755247 ] ASF GitHub Bot commented on FLINK-3551: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2761 Thanks for the patience @ch33hau and the reviews @thvasilo! I've put this PR on my list and will have a look at it soon. Thanks, Fabian > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.2.0 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15748587#comment-15748587 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 Hi @thvasilo, thanks for the help, it's ok, knew that everyone is quite busy =) > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.2.0 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15748576#comment-15748576 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on the issue: https://github.com/apache/flink/pull/2761 Hello @ch33hau, sorry for the late reply, I've been at a conference the past week. With the latest changes this LGTM, I've edited the fix version in JIRA to 1.2.0 to give this more visibility for the upcoming release, since it's very useful to have more examples. Hopefully some committer can take a look soon, I'll ping @fhueske here, maybe he can shepherd the PR or assign somebody. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.2.0 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15732356#comment-15732356 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 Hi @thvasilo thank you for taking your time on this PR again. I have fixed the typos and couple of one-line-if statements, sorry for those mistakes =( . > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15726026#comment-15726026 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r91119955 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala --- @@ -77,9 +79,8 @@ object SessionWindowing { .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L))) .sum(2) -if (fileOutput) { - aggregated.writeAsText(params.get("output")) -} else { +if (fileOutput) aggregated.writeAsText(params.get("output")) --- End diff -- Same here, braces for if. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15726027#comment-15726027 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r91119469 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -159,11 +160,46 @@ object IncrementalLearningSkeleton { * Builds up-to-date partial models on new training data. */ private class PartialModelBuilder extends AllWindowFunction[Int, Array[Double], TimeWindow] { + +protected def buildPartialModel(values: Iterable[Int]): Array[Double] = Array[Double](1) + override def apply(window: TimeWindow, - input: Iterable[Int], + values: Iterable[Int], out: Collector[Array[Double]]): Unit = { - out.collect(Array[Double](1.0)) + out.collect(buildPartialModel(values)) +} + } + + /** +* Creates newData using the model produced in batch-processing and the +* up-to-date partial model. +* +* By defaults emits the Integer 0 for every newData and the Integer 1 --- End diff -- Typo: "defaults" should be "default" > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15726028#comment-15726028 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r91119626 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala --- @@ -27,6 +27,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time +/** + * An example of grouped stream windowing into sliding time windows. + * This example uses [[RichParallelSourceFunction]] to generates a list of key-value pair. --- End diff -- Typo: "generates" should be "generate" > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15726025#comment-15726025 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r91119867 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala --- @@ -60,11 +64,9 @@ object SessionWindowing { input.foreach(value => { ctx.collectWithTimestamp(value, value._2) ctx.emitWatermark(new Watermark(value._2 - 1)) - if (!fileOutput) { -println(s"Collected: ${value}") - } + if (!fileOutput) println(s"Collected: ${value}") --- End diff -- AFAIK we encourage adding braces even for one line if statements, so the previous version was better. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15720152#comment-15720152 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 Hi @thvasilo , sorry for the late. I have updated my PR according to above comments, however, I'm not confident on the explanations for `GroupedProcessingTimeWindowExample` and `SessionWindowing`. Do you mind to check whether it has explains the examples? Thanks > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696363#comment-15696363 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89645498 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.kafka + +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 +import org.apache.flink.streaming.util.serialization.SimpleStringSchema + +/** + * Read Strings from Kafka and print them to standard out. + * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! + * + * Please pass the following arguments to run the example: + * --topic test + * --bootstrap.servers localhost:9092 + * --zookeeper.connect localhost:2181 + * --group.id myconsumer + * + */ +object ReadFromKafka { + + def main(args: Array[String]): Unit = { + +// parse input arguments +val params = ParameterTool.fromArgs(args) + +if (params.getNumberOfParameters < 4) { + println("Missing parameters!\nUsage: Kafka --topic " + +"--bootstrap.servers --zookeeper.connect --group.id ") + return +} + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.getConfig.disableSysoutLogging + env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)) +// create a checkpoint every 5 seconds +env.enableCheckpointing(5000) +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create a Kafka streaming source consumer for Kafka 0.8.x +val kafkaConsumer = new FlinkKafkaConsumer08( --- End diff -- I would suggest to keep it as is for now unless a project committer suggests otherwise. The linked issue would indicate that the intention is to have different versions, but I guess that would affect the release as well, since different jar files would need to be generated. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696338#comment-15696338 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89644293 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.kafka + +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 +import org.apache.flink.streaming.util.serialization.SimpleStringSchema + +/** + * Read Strings from Kafka and print them to standard out. + * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! + * + * Please pass the following arguments to run the example: + * --topic test + * --bootstrap.servers localhost:9092 + * --zookeeper.connect localhost:2181 + * --group.id myconsumer + * + */ +object ReadFromKafka { + + def main(args: Array[String]): Unit = { + +// parse input arguments +val params = ParameterTool.fromArgs(args) + +if (params.getNumberOfParameters < 4) { + println("Missing parameters!\nUsage: Kafka --topic " + +"--bootstrap.servers --zookeeper.connect --group.id ") + return +} + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.getConfig.disableSysoutLogging + env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)) +// create a checkpoint every 5 seconds +env.enableCheckpointing(5000) +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create a Kafka streaming source consumer for Kafka 0.8.x +val kafkaConsumer = new FlinkKafkaConsumer08( --- End diff -- Actually I saw that there is a related issue [FLINK-4286 Have Kafka examples that use the Kafka 0.9 connector](https://issues.apache.org/jira/browse/FLINK-4286) which basically requests to have examples for different Kafka versions (eg, Kafka08.jar, Kafka09.jar). So I just synced exactly same as Java's examples and thinking of let FLINK-4286 to take care about the different versions of Kafka. However, if use the more recent one make more sense to you, I am happy to update it > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692926#comment-15692926 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89464880 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.kafka + +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 +import org.apache.flink.streaming.util.serialization.SimpleStringSchema + +/** + * Read Strings from Kafka and print them to standard out. + * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! + * + * Please pass the following arguments to run the example: + * --topic test + * --bootstrap.servers localhost:9092 + * --zookeeper.connect localhost:2181 + * --group.id myconsumer + * + */ +object ReadFromKafka { + + def main(args: Array[String]): Unit = { + +// parse input arguments +val params = ParameterTool.fromArgs(args) + +if (params.getNumberOfParameters < 4) { + println("Missing parameters!\nUsage: Kafka --topic " + +"--bootstrap.servers --zookeeper.connect --group.id ") + return +} + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.getConfig.disableSysoutLogging + env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)) +// create a checkpoint every 5 seconds +env.enableCheckpointing(5000) +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create a Kafka streaming source consumer for Kafka 0.8.x +val kafkaConsumer = new FlinkKafkaConsumer08( --- End diff -- I understand that the Java example uses the 0.8 consumer as well, however is there any reason to continue using that version vs. a more recent one? > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692929#comment-15692929 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89469773 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData + .assignTimestampsAndWatermarks(new LinearTimestamp) + .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) + .apply(new PartialModelBuilder) + +// use partial model for newData +val prediction = newData.connect(model).map( + (_: Int) => 0, + (_: Array[Double]) => 1 +) + +// emit result +if (params.has("output")) { + prediction.writeAsText(params.get("output")) +} else { + println("Printing result to stdout. Use --output to specify output path.") + prediction.print() +} + +// execute program +env.execute("Streaming Incremental Learning") + } + + // * + // USER FUNCTIONS + // * + + /** +* Feeds new data for newData. By default it is implemented as constantly +* emitting the Integer 1 in a loop. +*/ + private class FiniteNewDataSource extends SourceFunction[Int] { +var counter: Int = 0 + +override def run(ctx: SourceContext[Int]) = { +
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692928#comment-15692928 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89465547 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.kafka + +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08 +import org.apache.flink.streaming.util.serialization.SimpleStringSchema + +/** + * Generate a String every 500 ms and write it into a Kafka topic + * + * Please pass the following arguments to run the example: + * --topic test --bootstrap.servers localhost:9092 + * + */ +object WriteIntoKafka { + + def main(args: Array[String]): Unit = { + +// parse input arguments +val params = ParameterTool.fromArgs(args) + +if (params.getNumberOfParameters < 2) { + println("Missing parameters!") + println("Usage: Kafka --topic --bootstrap.servers ") + return +} + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.getConfig.disableSysoutLogging + env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)) + +// very simple data generator +val messageStream = env.addSource(new SourceFunction[String]() { + var running = true + + override def run(ctx: SourceContext[String]): Unit = { +var i = 0L +while (this.running) { + ctx.collect(s"Element - ${i}") + i += 1 + Thread.sleep(500) +} + } + + override def cancel(): Unit = running = false +}) + +// create a Kafka producer for Kafka 0.8.x +val kafkaProducer = new FlinkKafkaProducer08( --- End diff -- As before any reason to keep using 0.8? > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692925#comment-15692925 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89471075 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import java.util.concurrent.TimeUnit.MILLISECONDS + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.windowing.time.Time + +object GroupedProcessingTimeWindowExample { --- End diff -- Some docstring similar to what the other examples have would be very useful here, same for the Java code that is missing it. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692924#comment-15692924 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89469371 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData + .assignTimestampsAndWatermarks(new LinearTimestamp) + .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) + .apply(new PartialModelBuilder) + +// use partial model for newData +val prediction = newData.connect(model).map( + (_: Int) => 0, --- End diff -- I think the way this example is implement does get the point of the example across. We are not trying to just generate a stream of 1s and 0s, the purpose is to show that we can use the connected stream coming from the `newData` and the `model` streams to read in a batch model which we enhance with the `partialModel`, and then use the new data stream to continuously improve the partial model. I would recommend making this step more verbose as it is in the Java version of the code. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692927#comment-15692927 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89471865 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows +import org.apache.flink.streaming.api.windowing.time.Time + +object SessionWindowing { --- End diff -- Same here, docstring explaining the example is needed, please add for the Java version as well. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15692930#comment-15692930 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r89472254 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.examples.java.wordcount.util.WordCountData +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment + +/** + * Implements a windowed version of the streaming "WordCount" program. + * + * + * The input is a plain text file with lines separated by newline characters. + * + * + * Usage: WordCount + * --input path + * --output path + * --window n + * --slide n + * + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}. --- End diff -- Probably better to have a Scala-style link here i.e. `[[org.apache.flink.examples.java.wordcount.util.WordCountData]]` > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Lim Chee Hau > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15670912#comment-15670912 ] ASF GitHub Bot commented on FLINK-3551: --- Github user ch33hau commented on the issue: https://github.com/apache/flink/pull/2761 @thvasilo thanks! I will do that. > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15670501#comment-15670501 ] ASF GitHub Bot commented on FLINK-3551: --- Github user thvasilo commented on the issue: https://github.com/apache/flink/pull/2761 Hello Lim, thank you for your contribution! I've taken a quick look and most of these look fine, plus I see you've included the required tests. I'll do a review this week and hopefully a committer will have some time to take a second look and merge this soon. Regards, Theodore > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15640091#comment-15640091 ] ASF GitHub Bot commented on FLINK-3551: --- GitHub user ch33hau opened a pull request: https://github.com/apache/flink/pull/2761 [FLINK-3551] [examples] Sync Scala Streaiming Examples Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed - Add IterateExample to iteration example - Add ReadFromKafka to Kafka example - Add WriteIntoKafka to Kafka example - Add IncrementalLearningSkeleton to ml example - Add TwitterExample to Twitter example - Add GroupedProcessingTimeWindowExample to windowing example - Add SessionWindowing to windowing example - Add WindowWordCount to windowing example - Add WordCount to wordcount example You can merge this pull request into a Git repository by running: $ git pull https://github.com/ch33hau/flink sync-up-scala-examples Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2761.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2761 commit f7f3da8bc7652f0587dcab908470157f2d513f1b Author: ch33hauDate: 2016-11-05T16:41:59Z [FLINK-3551] [examples] Sync Scala Streaiming Examples - Add IterateExample to iteration example - Add ReadFromKafka to Kafka example - Add WriteIntoKafka to Kafka example - Add IncrementalLearningSkeleton to ml example - Add TwitterExample to Twitter example - Add GroupedProcessingTimeWindowExample to windowing example - Add SessionWindowing to windowing example - Add WindowWordCount to windowing example - Add WordCount to wordcount example > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15622274#comment-15622274 ] Lim Chee Hau commented on FLINK-3551: - Wonderful =) Thanks > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1562#comment-1562 ] Stephan Ewen commented on FLINK-3551: - If you want to help out with adding the Scala variants of these examples, that would be amazing! In my personal opinion, one could probably drop the "PojoExample". > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples
[ https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15619447#comment-15619447 ] Lim Chee Hau commented on FLINK-3551: - Hi, I'm pretty new to Apache Flink, *flink-example* is undoubtedly one of my learning materials. Noticed that the Scala examples are quite limited at the moment, and I saw this issue was actually been created. I'm not sure is it possible for me to contribute to this task? I do have working experience in both Java and Scala (less experience in Scala by the way). Below is the list of examples that currently available in Java but not Scala: - iteration/IterateExample - kafka/ReadFromKafka - kafka/WriteIntoKafka - ml/IncrementalLearningSkeleton - socket/SocketWindowWordCount - twitter/TwittweExample - windowing/GroupedProcessingTimeWindowExample - windowing/SessionWindowing - windowing/WindowWordCount - wordcount/PojoExample - wordcount/WordCount Thanks =) > Sync Scala and Java Streaming Examples > -- > > Key: FLINK-3551 > URL: https://issues.apache.org/jira/browse/FLINK-3551 > Project: Flink > Issue Type: Sub-task > Components: Examples >Affects Versions: 1.0.0 >Reporter: Stephan Ewen > Fix For: 1.0.1 > > > The Scala Examples lack behind the Java Examples -- This message was sent by Atlassian JIRA (v6.3.4#6332)