[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2761 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124667752 --- Diff: flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala --- @@ -0,0 +1,136 @@ +package org.apache.flink.streaming.scala.examples --- End diff -- AL2 header missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124658721 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java --- @@ -126,4 +94,40 @@ public void apply(Long key, Window window, Iterable> values, return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } } + + /** +* Parallel data source that serves a list of key-value pair. --- End diff -- +s -> `... list of key-value pairs.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124658613 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java --- @@ -34,49 +34,17 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; /** - * Example of grouped processing time windows. + * An example of grouped stream windowing into sliding time windows. + * This example uses [[RichParallelSourceFunction]] to generate a list of key-value pair. --- End diff -- +s -> `... list of key-value pairs.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124667797 --- Diff: flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java --- @@ -0,0 +1,135 @@ +package org.apache.flink.streaming.test; --- End diff -- AL2 header missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r124658452 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java --- @@ -70,9 +70,7 @@ public void run(SourceContext> ctx) throws Excepti for (Tuple3 value : input) { ctx.collectWithTimestamp(value, value.f1); ctx.emitWatermark(new Watermark(value.f1 - 1)); - if (!fileOutput) { - System.out.println("Collected: " + value); - } + System.out.println("Collected: " + value); --- End diff -- remove the `println` as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97205993 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. --- End diff -- Scaladocs are commented with ``` /** * */ ``` instead of ``` /** * */ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97205961 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. + * The program sums up random numbers and counts additions --- End diff -- The use of HTML is discouraged in Scaladoc. Instead use wiki markup (markdown) whenever possible. See [Scaladoc guidelines](http://docs.scala-lang.org/style/scaladoc.html). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97658391 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import java.util.concurrent.TimeUnit.MILLISECONDS + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.windowing.time.Time + +/** + * An example of grouped stream windowing into sliding time windows. + * This example uses [[RichParallelSourceFunction]] to generate a list of key-value pair. + */ +object GroupedProcessingTimeWindowExample { + + def main(args: Array[String]): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(4) + +val stream = env.addSource(new RichParallelSourceFunction[(Long, Long)]() { --- End diff -- Move the source function to a separate class? It "hides" the important aspects of the example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97657755 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData + .assignTimestampsAndWatermarks(new LinearTimestamp) + .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) + .apply(new PartialModelBuilder) + +// use partial model for newData +val prediction = newData.connect(model).map( + (_: Int) => 0, + (_: Array[Double]) => 1 +) + +// emit result +if (params.has("output")) { + prediction.writeAsText(params.get("output")) +} else { + println("Printing result to stdout. Use --output to specify output path.") + prediction.print() +} + +// execute program +env.execute("Streaming Incremental Learning") + } + + // * + // USER FUNCTIONS + // * + + /** +* Feeds new data for newData. By default it is implemented as constantly +* emitting the Integer 1 in a loop. +*/ + private class FiniteNewDataSource extends SourceFunction[Int] { +var counter: Int = 0 + +override def run(ctx: SourceContext[Int]) = { + Thread.sleep(15) + while (counter < 50) { +ctx.collect(getNewData) + } +} + +def getNewData = { +
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206069 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. + * The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. + * + * + * This example shows how to use: + * + * streaming iterations, + * buffer timeout to enhance latency, + * directed outputs. + * + * + */ +object IterateExample { + + private val Bound = 100 + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// obtain execution environment and set setBufferTimeout to 1 to enable +// continuous flushing of the output buffers (lowest latency) +val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1) + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create input stream of integer pairs +val inputStream = --- End diff -- I find it helpful to add the type of the DataStream. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207859 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.twitter + +import java.util.StringTokenizer + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.connectors.twitter.TwitterSource +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData +import org.codehaus.jackson.JsonNode +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.mutable.ListBuffer + +/** + * Implements the "TwitterStream" program that computes a most used word + * occurrence over JSON objects in a streaming fashion. + * + * The input is a Tweet stream from a TwitterSource. + * + * + * Usage: Usage: TwitterExample [--output ] + * [--twitter-source.consumerKey + * --twitter-source.consumerSecret + * --twitter-source.token + * --twitter-source.tokenSecret ] + * + * + * If no parameters are provided, the program is run with default data from + * {@link TwitterExampleData}. + * + * + * This example shows how to: + * + * acquire external data, + * use in-line defined functions, + * handle flattened stream inputs. + * + */ +object TwitterExample { + + def main(args: Array[String]): Unit = { + +// Checking input parameters +val params = ParameterTool.fromArgs(args) +println("Usage: TwitterExample [--output ] " + + "[--twitter-source.consumerKey " + + "--twitter-source.consumerSecret " + + "--twitter-source.token " + + "--twitter-source.tokenSecret ]") + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +env.setParallelism(params.getInt("parallelism", 1)) + +// get input data +val streamSource = +if (params.has(TwitterSource.CONSUMER_KEY) && + params.has(TwitterSource.CONSUMER_SECRET) && + params.has(TwitterSource.TOKEN) && + params.has(TwitterSource.TOKEN_SECRET) +) { + env.addSource(new TwitterSource(params.getProperties)) +} else { + print("Executing TwitterStream example with default props.") + print("Use --twitter-source.consumerKey --twitter-source.consumerSecret " + +"--twitter-source.token " + +"--twitter-source.tokenSecret specify the authentication info." + ) + // get default test text data + env.fromElements(TwitterExampleData.TEXTS: _*) +} + +val tweets = streamSource + // selecting English tweets and splitting to (word, 1) + .flatMap { value: String => + val jsonParser = new ObjectMapper() --- End diff -- Creating a new `ObjectMapper()` in each function call is quite expensive --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206812 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a --- End diff -- please verify Scaladocs style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207774 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.twitter + +import java.util.StringTokenizer + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.connectors.twitter.TwitterSource +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData +import org.codehaus.jackson.JsonNode +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.mutable.ListBuffer + +/** + * Implements the "TwitterStream" program that computes a most used word + * occurrence over JSON objects in a streaming fashion. + * + * The input is a Tweet stream from a TwitterSource. + * + * + * Usage: Usage: TwitterExample [--output ] + * [--twitter-source.consumerKey + * --twitter-source.consumerSecret + * --twitter-source.token + * --twitter-source.tokenSecret ] + * + * + * If no parameters are provided, the program is run with default data from + * {@link TwitterExampleData}. + * + * + * This example shows how to: + * + * acquire external data, + * use in-line defined functions, + * handle flattened stream inputs. + * + */ +object TwitterExample { + + def main(args: Array[String]): Unit = { + +// Checking input parameters +val params = ParameterTool.fromArgs(args) +println("Usage: TwitterExample [--output ] " + + "[--twitter-source.consumerKey " + + "--twitter-source.consumerSecret " + + "--twitter-source.token " + + "--twitter-source.tokenSecret ]") + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +env.setParallelism(params.getInt("parallelism", 1)) + +// get input data +val streamSource = +if (params.has(TwitterSource.CONSUMER_KEY) && + params.has(TwitterSource.CONSUMER_SECRET) && + params.has(TwitterSource.TOKEN) && + params.has(TwitterSource.TOKEN_SECRET) +) { + env.addSource(new TwitterSource(params.getProperties)) +} else { + print("Executing TwitterStream example with default props.") + print("Use --twitter-source.consumerKey --twitter-source.consumerSecret " + +"--twitter-source.token " + +"--twitter-source.tokenSecret specify the authentication info." + ) + // get default test text data + env.fromElements(TwitterExampleData.TEXTS: _*) +} + +val tweets = streamSource --- End diff -- add DataStream type --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207663 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.twitter + +import java.util.StringTokenizer + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.connectors.twitter.TwitterSource +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData +import org.codehaus.jackson.JsonNode +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.mutable.ListBuffer + +/** + * Implements the "TwitterStream" program that computes a most used word --- End diff -- Scaladocs style --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97654855 --- Diff: flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/iteration/IterateExampleITCase.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test.exampleScalaPrograms.iteration; + +import org.apache.flink.streaming.scala.examples.iteration.IterateExample; +import org.apache.flink.streaming.examples.iteration.util.IterateExampleData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class IterateExampleITCase extends StreamingProgramTestBase { --- End diff -- ITCases extending `StreamingProgramTestBase` are very expensive because they internally start a Flink minicluster which takes a significant amount of time, usually much more than the actual test. The class `StreamingMultipleProgramsTestBase` allows to reuse the minicluster across several tests. I would suggest to port all existing example tests (Java and Scala) into a single ITCase which extends `StreamingMultipleProgramsTestBase`. This should reduce Flink's build time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207844 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.twitter + +import java.util.StringTokenizer + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.connectors.twitter.TwitterSource +import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData +import org.codehaus.jackson.JsonNode +import org.codehaus.jackson.map.ObjectMapper + +import scala.collection.mutable.ListBuffer + +/** + * Implements the "TwitterStream" program that computes a most used word + * occurrence over JSON objects in a streaming fashion. + * + * The input is a Tweet stream from a TwitterSource. + * + * + * Usage: Usage: TwitterExample [--output ] + * [--twitter-source.consumerKey + * --twitter-source.consumerSecret + * --twitter-source.token + * --twitter-source.tokenSecret ] + * + * + * If no parameters are provided, the program is run with default data from + * {@link TwitterExampleData}. + * + * + * This example shows how to: + * + * acquire external data, + * use in-line defined functions, + * handle flattened stream inputs. + * + */ +object TwitterExample { + + def main(args: Array[String]): Unit = { + +// Checking input parameters +val params = ParameterTool.fromArgs(args) +println("Usage: TwitterExample [--output ] " + + "[--twitter-source.consumerKey " + + "--twitter-source.consumerSecret " + + "--twitter-source.token " + + "--twitter-source.tokenSecret ]") + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +env.setParallelism(params.getInt("parallelism", 1)) + +// get input data +val streamSource = +if (params.has(TwitterSource.CONSUMER_KEY) && + params.has(TwitterSource.CONSUMER_SECRET) && + params.has(TwitterSource.TOKEN) && + params.has(TwitterSource.TOKEN_SECRET) +) { + env.addSource(new TwitterSource(params.getProperties)) +} else { + print("Executing TwitterStream example with default props.") + print("Use --twitter-source.consumerKey --twitter-source.consumerSecret " + +"--twitter-source.token " + +"--twitter-source.tokenSecret specify the authentication info." + ) + // get default test text data + env.fromElements(TwitterExampleData.TEXTS: _*) +} + +val tweets = streamSource + // selecting English tweets and splitting to (word, 1) + .flatMap { value: String => --- End diff -- I think this complex function should be moved into a separate class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97657374 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData + .assignTimestampsAndWatermarks(new LinearTimestamp) + .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) + .apply(new PartialModelBuilder) + +// use partial model for newData +val prediction = newData.connect(model).map( + (_: Int) => 0, + (_: Array[Double]) => 1 +) + +// emit result +if (params.has("output")) { + prediction.writeAsText(params.get("output")) +} else { + println("Printing result to stdout. Use --output to specify output path.") + prediction.print() +} + +// execute program +env.execute("Streaming Incremental Learning") + } + + // * + // USER FUNCTIONS + // * + + /** +* Feeds new data for newData. By default it is implemented as constantly +* emitting the Integer 1 in a loop. +*/ + private class FiniteNewDataSource extends SourceFunction[Int] { +var counter: Int = 0 + +override def run(ctx: SourceContext[Int]) = { + Thread.sleep(15) --- End diff -- can be simplified to ``` Thread.sleep(15) (0 until 50).foreach{ _ => Thread.sleep(5)
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97651600 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.wordcount + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.examples.java.wordcount.util.WordCountData +import org.apache.flink.streaming.api.scala._ + +/** + * Implements the "WordCount" program that computes a simple word occurrence --- End diff -- Scaladocs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206987 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData --- End diff -- please add type --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97207217 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) +val newData = env.addSource(new FiniteNewDataSource) + +val model = trainingData + .assignTimestampsAndWatermarks(new LinearTimestamp) + .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) + .apply(new PartialModelBuilder) + +// use partial model for newData +val prediction = newData.connect(model).map( + (_: Int) => 0, --- End diff -- I agree with @thvasilo. We should copy the code of the Java job. Otherwise, this example just demonstrates how to use `connect()` and `CoMapFunction`. For that we would not need custom sources and window aggregation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206373 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. + * The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. + * + * + * This example shows how to use: + * + * streaming iterations, + * buffer timeout to enhance latency, + * directed outputs. + * + * + */ +object IterateExample { + + private val Bound = 100 + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// obtain execution environment and set setBufferTimeout to 1 to enable +// continuous flushing of the output buffers (lowest latency) +val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1) + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create input stream of integer pairs +val inputStream = +if (params.has("input")) { + env.readTextFile(params.get("input")).map { value: String => +val record = value.substring(1, value.length - 1) +val splitted = record.split(",") +(Integer.parseInt(splitted(0)), Integer.parseInt(splitted(1))) + } +} else { + println("Executing Iterate example with default input data set.") + println("Use --input to specify file input.") + env.addSource(new RandomFibonacciSource) +} + +def withinBound(value: (Int, Int)) = value._1 < Bound && value._2 < Bound + +// create an iterative data stream from the input with 5 second timeout +val numbers = inputStream + // Map the inputs so that the next Fibonacci numbers can be calculated + // while preserving the original input tuple + // A counter is attached to the tuple and incremented in every iteration step + .map(value => (value._1, value._2, value._1, value._2, 0)) + .iterate( +(iteration: DataStream[(Int, Int, Int, Int, Int)]) => { + // calculates the next Fibonacci number and increment the counter + val step = iteration.map(value => +(value._1, value._2, value._4, value._3 + value._4, value._5 + 1)) + // testing which tuple needs to be iterated again + val feedback = step.filter(value => withinBound(value._3, value._4)) + // get the input pairs that have the greatest iteration counter --- End diff -- Please check this comment. I do not see a sliding window or how the greatest iteration counter is identified. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97658665 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows +import org.apache.flink.streaming.api.windowing.time.Time + +/** + * An example of grouped stream windowing in session windows with session timeout of 3 msec. + * A source fetches elements with key, timestamp, and count. + */ +object SessionWindowing { + + def main(args: Array[String]): Unit = { + +val params = ParameterTool.fromArgs(args) +val env = StreamExecutionEnvironment.getExecutionEnvironment + +env.getConfig.setGlobalJobParameters(params) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setParallelism(1) + +val fileOutput = params.has("output") + +val input = List( + ("a", 1L, 1), + ("b", 1L, 1), + ("b", 3L, 1), + ("b", 5L, 1), + ("c", 6L, 1), + // We expect to detect the session "a" earlier than this point (the old + // functionality can only detect here when the next starts) + ("a", 10L, 1), + // We expect to detect session "b" and "c" at this point as well + ("c", 11L, 1) +) + +val source = env.addSource(new SourceFunction[(String, Long, Int)]() { + + override def run(ctx: SourceContext[(String, Long, Int)]): Unit = { +input.foreach(value => { + ctx.collectWithTimestamp(value, value._2) + ctx.emitWatermark(new Watermark(value._2 - 1)) + if (!fileOutput) { --- End diff -- I'd remove this condition. Not sure how much value the printed values add to the example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206996 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.ml + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Skeleton for incremental machine learning algorithm consisting of a + * pre-computed model, which gets updated for the new inputs and new input data + * for which the job provides predictions. + * + * + * This may serve as a base of a number of algorithms, e.g. updating an + * incremental Alternating Least Squares model while also providing the + * predictions. + * + * + * This example shows how to use: + * + * Connected streams + * CoFunctions + * Tuple data types + * + */ +object IncrementalLearningSkeleton { + + // * + // PROGRAM + // * + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// set up the execution environment +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// build new model on every second of new data +val trainingData = env.addSource(new FiniteTrainingDataSource) --- End diff -- please add type to DataStreams --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2761#discussion_r97206112 --- Diff: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.iteration + +import java.util.Random + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} + +/** + * Example illustrating iterations in Flink streaming. + * The program sums up random numbers and counts additions + * it performs to reach a specific threshold in an iterative streaming fashion. + * + * + * This example shows how to use: + * + * streaming iterations, + * buffer timeout to enhance latency, + * directed outputs. + * + * + */ +object IterateExample { + + private val Bound = 100 + + def main(args: Array[String]): Unit = { +// Checking input parameters +val params = ParameterTool.fromArgs(args) + +// obtain execution environment and set setBufferTimeout to 1 to enable +// continuous flushing of the output buffers (lowest latency) +val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1) + +// make parameters available in the web interface +env.getConfig.setGlobalJobParameters(params) + +// create input stream of integer pairs +val inputStream = +if (params.has("input")) { + env.readTextFile(params.get("input")).map { value: String => --- End diff -- Please add a comment describing what the map function does --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---