Repository: incubator-flink Updated Branches: refs/heads/master 92ceacd23 -> baf81c6c3
[scala] [streaming] added scala streams as sources in streaming-api scala examples Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/baf81c6c Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/baf81c6c Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/baf81c6c Branch: refs/heads/master Commit: baf81c6c31016d4ef02584fea1b6d2d4db1168c2 Parents: 5daa45c Author: carbone <[email protected]> Authored: Tue Jan 6 13:53:01 2015 +0100 Committer: mbalassi <[email protected]> Committed: Tue Jan 6 15:09:04 2015 +0100 ---------------------------------------------------------------------- .../examples/windowing/TopSpeedWindowing.scala | 44 ++++++++++---------- .../scala/examples/windowing/WindowJoin.scala | 36 ++++++++-------- 2 files changed, 40 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/baf81c6c/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index a18eb37..a43f479 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -21,15 +21,13 @@ package org.apache.flink.streaming.scala.examples.windowing import java.util.concurrent.TimeUnit._ -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.util.Collector -import scala.math.{max, min} - +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.windowing.{Delta, Time} +import org.apache.flink.api.scala._ +import scala.Stream._ +import scala.math._ import scala.util.Random -import org.apache.flink.streaming.api.scala.windowing.Time -import org.apache.flink.streaming.api.scala.windowing.Delta - /** * An example of grouped stream windowing where different eviction and * trigger policies can be used. A source fetches events from cars @@ -40,7 +38,7 @@ import org.apache.flink.streaming.api.scala.windowing.Delta */ object TopSpeedWindowing { - case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long) + case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable def main(args: Array[String]) { if (!parseParameters(args)) { @@ -48,10 +46,11 @@ object TopSpeedWindowing { } val env = StreamExecutionEnvironment.getExecutionEnvironment - val cars = env.addSource(carSource _).groupBy("carId") + val cars = env.fromCollection(genCarStream()) + .groupBy("carId") .window(Time.of(evictionSec, SECONDS)) - .every(Delta.of[CarSpeed](triggerMeters, - (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0))) + .every(Delta.of[CarEvent](triggerMeters, + (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0))) .reduce((x, y) => if (x.speed > y.speed) x else y) cars print @@ -60,19 +59,20 @@ object TopSpeedWindowing { } - def carSource(out: Collector[CarSpeed]) = { - - val speeds = new Array[Int](numOfCars) - val distances = new Array[Double](numOfCars) + def genCarStream(): Stream[CarEvent] = { - while (true) { - Thread sleep 1000 - for (i <- 0 until speeds.length) { - speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i) - 5) - distances(i) += speeds(i) / 3.6d - out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis)) - } + def nextSpeed(carEvent : CarEvent) : CarEvent = + { + val next = + if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) + CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) + } + def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = + { + Thread.sleep(1000) + speeds.append(carStream(speeds.map(nextSpeed))) } + carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) } def parseParameters(args: Array[String]): Boolean = { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/baf81c6c/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala index 08c7d65..d6c0363 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala @@ -18,8 +18,10 @@ package org.apache.flink.streaming.scala.examples.windowing -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.util.Collector +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment + +import scala.Stream._ import scala.util.Random import java.util.concurrent.TimeUnit @@ -34,8 +36,8 @@ object WindowJoin { val env = StreamExecutionEnvironment.getExecutionEnvironment //Create streams for names and ages by mapping the inputs to the corresponding objects - val names = env.addSource(nameStream _).map(x => Name(x._1, x._2)) - val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2)) + val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2)) + val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2)) //Join the two input streams by id on the last second every 2 seconds and create new //Person objects containing both name and age @@ -48,24 +50,22 @@ object WindowJoin { env.execute("WindowJoin") } - //Stream source for generating (id, name) pairs - def nameStream(out: Collector[(Long, String)]) = { - val names = Array("tom", "jerry", "alice", "bob", "john", "grace") - - for (i <- 1 to 10000) { - if (i % 100 == 0) Thread.sleep(1000) else { - out.collect((i, names(Random.nextInt(names.length)))) - } + def nameStream() : Stream[(Long,String)] = { + def nameMapper(names: Array[String])(x: Int) : (Long, String) = + { + if(x%100==0) Thread.sleep(1000) + (x, names(Random.nextInt(names.length))) } + range(1,10000).map(nameMapper(Array("tom", "jerry", "alice", "bob", "john", "grace"))) } - //Stream source for generating (id, age) pairs - def ageStream(out: Collector[(Long, Int)]) = { - for (i <- 1 to 10000) { - if (i % 100 == 0) Thread.sleep(1000) else { - out.collect((i, Random.nextInt(90))) - } + def ageStream() : Stream[(Long,Int)] = { + def ageMapper(x: Int) : (Long, Int) = + { + if(x%100==0) Thread.sleep(1000) + (x, Random.nextInt(90)) } + range(1,10000).map(ageMapper) } }
