[scala] [streaming] Added connect op to DataStream and implicit conversion for ConnectedDataStreams
[scala] [streaming] Changed return types to implicits Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7eaf726a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7eaf726a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7eaf726a Branch: refs/heads/release-0.8 Commit: 7eaf726aeab1ac461e934a4b9d5f87404a64bc0d Parents: b094d49 Author: carbone <[email protected]> Authored: Sat Jan 3 20:00:13 2015 +0100 Committer: mbalassi <[email protected]> Committed: Mon Jan 5 18:07:21 2015 +0100 ---------------------------------------------------------------------- .../api/datastream/WindowedDataStream.java | 6 +- .../windowing/TopSpeedWindowingExample.java | 175 ++++++++++--------- .../streaming/windowing/TopSpeedWindowing.scala | 11 +- .../scala/streaming/ConnectedDataStream.scala | 171 +++++++++--------- .../flink/api/scala/streaming/DataStream.scala | 15 +- .../scala/streaming/StreamingConversions.scala | 6 +- 6 files changed, 208 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 287f29d..e81395d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> { * @return The transformed DataStream */ public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) { - return dataStream.transform("NextGenWindowReduce", getType(), + return dataStream.transform("WindowReduce", getType(), getReduceInvokable(reduceFunction)); } @@ -255,7 +255,7 @@ public class WindowedDataStream<OUT> { TypeInformation<R> outType = TypeExtractor .getGroupReduceReturnTypes(reduceFunction, inType); - return dataStream.transform("NextGenWindowReduce", outType, + return dataStream.transform("WindowReduce", outType, getReduceGroupInvokable(reduceFunction)); } @@ -279,7 +279,7 @@ public class WindowedDataStream<OUT> { public <R> SingleOutputStreamOperator<R, ?> reduceGroup( GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) { - return dataStream.transform("NextGenWindowReduce", outType, + return dataStream.transform("WindowReduce", outType, getReduceGroupInvokable(reduceFunction)); } http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java index bc3bba5..0f5d8eb 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.examples.windowing; - import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -32,90 +31,98 @@ import java.util.Random; import java.util.concurrent.TimeUnit; /** - * An example of grouped stream windowing where different eviction and trigger policies can be used. - * A source fetches events from cars every 1 sec containing their id, their current speed (kmh), - * overall elapsed distance (m) and a timestamp. The streaming - * example triggers the top speed of each car every x meters elapsed for the last y seconds. + * An example of grouped stream windowing where different eviction and trigger + * policies can be used. A source fetches events from cars every 1 sec + * containing their id, their current speed (kmh), overall elapsed distance (m) + * and a timestamp. The streaming example triggers the top speed of each car + * every x meters elapsed for the last y seconds. */ public class TopSpeedWindowingExample { - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - DataStream topSpeeds = env - .addSource(CarSource.create(numOfCars)) - .groupBy(0) - .window(Time.of(evictionSec, TimeUnit.SECONDS)) - .every(Delta.of( - new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() { - @Override - public double getDelta(Tuple4<Integer, Integer, Double, Long> oldDataPoint, Tuple4<Integer, Integer, Double, Long> newDataPoint) { - return newDataPoint.f2 - oldDataPoint.f2; - } - } - , new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l), triggerMeters)) - .maxBy(1); - - topSpeeds.print(); - env.execute("CarTopSpeedWindowingExample"); - } - - private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> { - private Integer[] speeds; - private Double[] distances; - - private Random rand = new Random(); - - private CarSource(int numOfCars) { - speeds = new Integer[numOfCars]; - distances = new Double[numOfCars]; - Arrays.fill(speeds, 50); - Arrays.fill(distances, 0d); - } - - public static CarSource create(int cars) { - return new CarSource(cars); - } - - @Override - public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector) throws Exception { - - while (true) { - Thread.sleep(1000); - for (int carId = 0; carId < speeds.length; carId++) { - if (rand.nextBoolean()) - speeds[carId] = Math.min(100, speeds[carId] + 5); - else - speeds[carId] = Math.max(0, speeds[carId] - 5); - distances[carId] += speeds[carId] / 3.6d; - collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId, speeds[carId], distances[carId], System.currentTimeMillis())); - } - } - } - } - - private static int numOfCars = 2; - private static int evictionSec = 10; - private static double triggerMeters = 50; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - if (args.length == 3) { - numOfCars = Integer.valueOf(args[0]); - evictionSec = Integer.valueOf(args[1]); - triggerMeters = Double.valueOf(args[2]); - } else { - System.err.println("Usage: TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>"); - return false; - } - } - return true; - } + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings({ "rawtypes", "serial" }) + DataStream topSpeeds = env + .addSource(CarSource.create(numOfCars)) + .groupBy(0) + .window(Time.of(evictionSec, TimeUnit.SECONDS)) + .every(Delta.of(triggerMeters, + new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() { + @Override + public double getDelta( + Tuple4<Integer, Integer, Double, Long> oldDataPoint, + Tuple4<Integer, Integer, Double, Long> newDataPoint) { + return newDataPoint.f2 - oldDataPoint.f2; + } + }, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).maxBy(1); + + topSpeeds.print(); + env.execute("CarTopSpeedWindowingExample"); + } + + private static class CarSource implements + SourceFunction<Tuple4<Integer, Integer, Double, Long>> { + + private static final long serialVersionUID = 1L; + private Integer[] speeds; + private Double[] distances; + + private Random rand = new Random(); + + private CarSource(int numOfCars) { + speeds = new Integer[numOfCars]; + distances = new Double[numOfCars]; + Arrays.fill(speeds, 50); + Arrays.fill(distances, 0d); + } + + public static CarSource create(int cars) { + return new CarSource(cars); + } + + @Override + public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector) + throws Exception { + + while (true) { + Thread.sleep(1000); + for (int carId = 0; carId < speeds.length; carId++) { + if (rand.nextBoolean()) { + speeds[carId] = Math.min(100, speeds[carId] + 5); + } else { + speeds[carId] = Math.max(0, speeds[carId] - 5); + } + distances[carId] += speeds[carId] / 3.6d; + collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId, + speeds[carId], distances[carId], System.currentTimeMillis())); + } + } + } + } + + private static int numOfCars = 2; + private static int evictionSec = 10; + private static double triggerMeters = 50; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + if (args.length == 3) { + numOfCars = Integer.valueOf(args[0]); + evictionSec = Integer.valueOf(args[1]); + triggerMeters = Double.valueOf(args[2]); + } else { + System.err + .println("Usage: TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>"); + return false; + } + } + return true; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala index 10ddfd8..e39fb11 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/streaming/windowing/TopSpeedWindowing.scala @@ -30,10 +30,12 @@ import scala.math.{max, min} import scala.util.Random /** - * An example of grouped stream windowing where different eviction and trigger policies can be used. - * A source fetches events from cars every 1 sec containing their id, their current speed (kmh), + * An example of grouped stream windowing where different eviction and + * trigger policies can be used.A source fetches events from cars + * every 1 sec containing their id, their current speed (kmh), * overall elapsed distance (m) and a timestamp. The streaming - * example triggers the top speed of each car every x meters elapsed for the last y seconds. + * example triggers the top speed of each car every x meters elapsed + * for the last y seconds. */ object TopSpeedWindowing { @@ -47,7 +49,8 @@ object TopSpeedWindowing { val env = StreamExecutionEnvironment.getExecutionEnvironment val cars = env.addSource(carSource _).groupBy("carId") .window(Time.of(evictionSec, SECONDS)) - .every(Delta.of[CarSpeed](triggerMeters, (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0))) + .every(Delta.of[CarSpeed](triggerMeters, + (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0))) .reduce((x, y) => if (x.speed > y.speed) x else y) cars print http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala index 1c3f5a1..985e512 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/ConnectedDataStream.scala @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,19 +20,19 @@ package org.apache.flink.api.scala.streaming import java.util -import scala.collection.JavaConversions._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment._ -import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream} -import org.apache.flink.streaming.api.function.co.{CoWindowFunction, CoFlatMapFunction, CoMapFunction, CoReduceFunction} -import org.apache.flink.streaming.api.invokable.operator.co.{CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable} +import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaCStream } +import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction } +import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable, CoReduceInvokable } import org.apache.flink.util.Collector +import org.apache.flink.api.scala.streaming.StreamingConversions._ - +import scala.collection.JavaConversions._ import scala.reflect.ClassTag -class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { +class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { /** * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps @@ -40,21 +41,22 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * @param fun2 for each element of the second input. Each * CoMapFunction call returns exactly one element. * - * The CoMapFunction used to jointly transform the two input + * The CoMapFunction used to jointly transform the two input * DataStreams * @return The transformed { @link DataStream} */ - def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): DataStream[R] = { + def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): + DataStream[R] = { if (fun1 == null || fun2 == null) { throw new NullPointerException("Map function must not be null.") } - val comapper = new CoMapFunction[IN1,IN2,R] { - def map1(in1: IN1): R = clean(fun1)(in1) - def map2(in2: IN2): R = clean(fun2)(in2) + val comapper = new CoMapFunction[IN1, IN2, R] { + def map1(in1: IN1): R = clean(fun1)(in1) + def map2(in2: IN2): R = clean(fun2)(in2) } - new DataStream(javaStream.addCoFunction("map",implicitly[TypeInformation[R]], - new CoMapInvokable[IN1,IN2,R](comapper))) + new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]], + new CoMapInvokable[IN1, IN2, R](comapper))) } /** @@ -67,17 +69,18 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * the {@link RichFuntion} interface. * * @param coMapper - * The CoMapFunction used to jointly transform the two input + * The CoMapFunction used to jointly transform the two input * DataStreams * @return The transformed { @link DataStream} */ - def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1,IN2,R]): DataStream[R] = { + def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): + DataStream[R] = { if (coMapper == null) { throw new NullPointerException("Map function must not be null.") } - new DataStream(javaStream.addCoFunction("map",implicitly[TypeInformation[R]], - new CoMapInvokable[IN1,IN2,R](coMapper))) + new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]], + new CoMapInvokable[IN1, IN2, R](coMapper))) } /** @@ -91,16 +94,17 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * interface. * * @param coFlatMapper - * The CoFlatMapFunction used to jointly transform the two input + * The CoFlatMapFunction used to jointly transform the two input * DataStreams * @return The transformed { @link DataStream} */ - def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1,IN2,R]): DataStream[R] = { + def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): + DataStream[R] = { if (coFlatMapper == null) { throw new NullPointerException("FlatMap function must not be null.") } new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]], - new CoFlatMapInvokable[IN1,IN2, R](coFlatMapper))) + new CoFlatMapInvokable[IN1, IN2, R](coFlatMapper))) } /** @@ -110,16 +114,17 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * and @param fun2 for each element of the second * input. Each CoFlatMapFunction call returns any number of elements * including none. - + * * @return The transformed { @link DataStream} */ - def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, fun2: (IN2, Collector[R]) => Unit): DataStream[R] = { + def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, + fun2: (IN2, Collector[R]) => Unit): DataStream[R] = { if (fun1 == null || fun2 == null) { throw new NullPointerException("FlatMap functions must not be null.") } - val flatMapper = new CoFlatMapFunction[IN1,IN2, R] { - def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value,out) - def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value,out) + val flatMapper = new CoFlatMapFunction[IN1, IN2, R] { + def flatMap1(value: IN1, out: Collector[R]): Unit = clean(fun1)(value, out) + def flatMap2(value: IN2, out: Collector[R]): Unit = clean(fun2)(value, out) } flatMap(flatMapper) } @@ -131,15 +136,15 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * {@link ConnectedDataStream#reduce} * * @param keyPosition1 - * The field used to compute the hashcode of the elements in the + * The field used to compute the hashcode of the elements in the * first input stream. * @param keyPosition2 - * The field used to compute the hashcode of the elements in the + * The field used to compute the hashcode of the elements in the * second input stream. * @return @return The transformed { @link ConnectedDataStream} */ - def groupBy(keyPosition1 : Int,keyPosition2: Int) : ConnectedDataStream[IN1,IN2] = { - new ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyPosition1,keyPosition2)) + def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = { + javaStream.groupBy(keyPosition1, keyPosition2) } /** @@ -149,13 +154,14 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * {@link ConnectedDataStream#reduce} * * @param keyPositions1 - * The fields used to group the first input stream. + * The fields used to group the first input stream. * @param keyPositions2 - * The fields used to group the second input stream. + * The fields used to group the second input stream. * @return @return The transformed { @link ConnectedDataStream} */ - def groupBy(keyPositions1 : Array[Int],keyPositions2: Array[Int]) : ConnectedDataStream[IN1,IN2] = { - new ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyPositions1,keyPositions2)) + def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): + ConnectedDataStream[IN1, IN2] = { + javaStream.groupBy(keyPositions1, keyPositions2) } /** @@ -166,13 +172,13 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * to drill down into objects, as in {@code "field1.getInnerField2()" }. * * @param field1 - * The grouping expression for the first input + * The grouping expression for the first input * @param field2 - * The grouping expression for the second input + * The grouping expression for the second input * @return The grouped { @link ConnectedDataStream} */ - def groupBy(field1 : String, field2: String) : ConnectedDataStream[IN1,IN2] = { - new ConnectedDataStream[IN1,IN2](javaStream.groupBy(field1,field2)) + def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = { + javaStream.groupBy(field1, field2) } /** @@ -184,13 +190,14 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * . * * @param fields1 - * The grouping expressions for the first input + * The grouping expressions for the first input * @param fields2 - * The grouping expressions for the second input + * The grouping expressions for the second input * @return The grouped { @link ConnectedDataStream} */ - def groupBy(fields1 : Array[String],fields2: Array[String]) : ConnectedDataStream[IN1,IN2] = { - new ConnectedDataStream[IN1,IN2](javaStream.groupBy(fields1,fields2)) + def groupBy(fields1: Array[String], fields2: Array[String]): + ConnectedDataStream[IN1, IN2] = { + javaStream.groupBy(fields1, fields2) } /** @@ -200,12 +207,13 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * {@link ConnectedDataStream#reduce} * * @param fun1 - * The function used for grouping the first input + * The function used for grouping the first input * @param fun2 - * The function used for grouping the second input + * The function used for grouping the second input * @return @return The transformed { @link ConnectedDataStream} */ - def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _): ConnectedDataStream[IN1,IN2] = { + def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _): + ConnectedDataStream[IN1, IN2] = { val keyExtractor1 = new KeySelector[IN1, Any] { def getKey(in: IN1) = clean(fun1)(in) @@ -214,7 +222,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { def getKey(in: IN2) = clean(fun2)(in) } - new ConnectedDataStream[IN1,IN2](javaStream.groupBy(keyExtractor1,keyExtractor2)) + javaStream.groupBy(keyExtractor1, keyExtractor2) } /** @@ -227,17 +235,18 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * the reduce function can be applied incrementally. * * @param coReducer - * The { @link CoReduceFunction} that will be called for every + * The { @link CoReduceFunction} that will be called for every * element of the inputs. * @return The transformed { @link DataStream}. */ - def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1,IN2,R]): DataStream[R] = { + def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]): + DataStream[R] = { if (coReducer == null) { throw new NullPointerException("Reduce function must not be null.") } new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]], - new CoReduceInvokable[IN1,IN2,R](coReducer))) + new CoReduceInvokable[IN1, IN2, R](coReducer))) } /** @@ -256,20 +265,20 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * * @return The transformed { @link DataStream}. */ - def reduce[R: TypeInformation: ClassTag](reducer1: (IN1,IN1) => IN1, reducer2: (IN2,IN2) => IN2, - mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = { + def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1, + reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = { if (mapper1 == null || mapper2 == null) { throw new NullPointerException("Map functions must not be null.") } if (reducer1 == null || reducer2 == null) { throw new NullPointerException("Reduce functions must not be null.") } - - val reducer = new CoReduceFunction[IN1,IN2,R] { - def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1,value2) - def map2(value: IN2): R = clean(mapper2)(value) - def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1,value2) - def map1(value: IN1): R = clean(mapper1)(value) + + val reducer = new CoReduceFunction[IN1, IN2, R] { + def reduce1(value1: IN1, value2: IN1): IN1 = clean(reducer1)(value1, value2) + def map2(value: IN2): R = clean(mapper2)(value) + def reduce2(value1: IN2, value2: IN2): IN2 = clean(reducer2)(value1, value2) + def map1(value: IN1): R = clean(mapper1)(value) } reduce(reducer) } @@ -281,23 +290,24 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * default to compute windows. * * @param coWindowFunction - * The { @link CoWindowFunction} that will be applied for the time + * The { @link CoWindowFunction} that will be applied for the time * windows. * @param windowSize - * Size of the windows that will be aligned for both streams in + * Size of the windows that will be aligned for both streams in * milliseconds. * @param slideInterval - * After every function call the windows will be slid by this + * After every function call the windows will be slid by this * interval. * * @return The transformed { @link DataStream}. */ - def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: CoWindowFunction[IN1,IN2,R], windowSize:Long, slideInterval: Long) = { - if(coWindowFunction == null){ + def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: + CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = { + if (coWindowFunction == null) { throw new NullPointerException("CoWindow function must no be null") } - new DataStream[R](javaStream.windowReduce(coWindowFunction, windowSize, slideInterval)) + javaStream.windowReduce(coWindowFunction, windowSize, slideInterval) } /** @@ -307,26 +317,28 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * default to compute windows. * * @param coWindower - * The coWindowing function to be applied for the time windows. + * The coWindowing function to be applied for the time windows. * @param windowSize - * Size of the windows that will be aligned for both streams in + * Size of the windows that will be aligned for both streams in * milliseconds. * @param slideInterval - * After every function call the windows will be slid by this + * After every function call the windows will be slid by this * interval. * * @return The transformed { @link DataStream}. */ - def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], Collector[R]) => Unit , windowSize:Long, slideInterval: Long) = { - if(coWindower == null){ + def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], + Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = { + if (coWindower == null) { throw new NullPointerException("CoWindow function must no be null") } - val coWindowFun = new CoWindowFunction[IN1,IN2,R] { - def coWindow(first: util.List[IN1], second: util.List[IN2], out: Collector[R]): Unit = clean(coWindower)(first,second,out) + val coWindowFun = new CoWindowFunction[IN1, IN2, R] { + def coWindow(first: util.List[IN1], second: util.List[IN2], + out: Collector[R]): Unit = clean(coWindower)(first, second, out) } - new DataStream[R](javaStream.windowReduce(coWindowFun, windowSize, slideInterval)) + javaStream.windowReduce(coWindowFun, windowSize, slideInterval) } /** @@ -335,7 +347,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * @return The first DataStream. */ def getFirst(): DataStream[IN1] = { - new DataStream[IN1](javaStream.getFirst) + javaStream.getFirst } /** @@ -344,7 +356,7 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { * @return The second DataStream. */ def getSecond(): DataStream[IN2] = { - new DataStream[IN2](javaStream.getSecond) + javaStream.getSecond } /** @@ -365,5 +377,4 @@ class ConnectedDataStream[IN1,IN2] (javaStream: JavaCStream[IN1,IN2]) { javaStream.getInputType2 } - } http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index 546d8a9..ccfd176 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -19,22 +19,20 @@ package org.apache.flink.api.scala.streaming import org.apache.flink.api.scala._ import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, + SingleOutputStreamOperator, GroupedDataStream} import org.apache.flink.api.common.typeinfo.TypeInformation import scala.reflect.ClassTag import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.invokable.operator.MapInvokable -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator import org.apache.flink.util.Collector import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.invokable.StreamInvokable -import org.apache.flink.streaming.api.datastream.GroupedDataStream import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable -import org.apache.flink.streaming.api.datastream.GroupedDataStream import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.common.functions.FilterFunction @@ -94,6 +92,15 @@ class DataStream[T](javaStream: JavaStream[T]) { javaStream.merge(dataStreams.map(_.getJavaStream): _*) /** + * Creates a new ConnectedDataStream by connecting + * DataStream outputs of different type with each other. The + * DataStreams connected using this operators can be used with CoFunctions. + * + */ + def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = + javaStream.connect(dataStream.getJavaStream) + + /** * Groups the elements of a DataStream by the given key positions (for tuple/array types) to * be used with grouped operators like grouped reduce or grouped aggregations * http://git-wip-us.apache.org/repos/asf/flink/blob/7eaf726a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala index a34d0dc..9aefa04 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala @@ -21,9 +21,10 @@ package org.apache.flink.api.scala.streaming import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } +import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } object StreamingConversions { - + implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = new DataStream[R](javaStream) @@ -33,4 +34,7 @@ object StreamingConversions { implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = new SplitDataStream[R](javaStream) + implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): + ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) + }
