http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala deleted file mode 100644 index bcd586e..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming - -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.environment.{ StreamExecutionEnvironment => JavaEnv } -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.commons.lang.Validate -import scala.reflect.ClassTag -import org.apache.flink.streaming.api.datastream.DataStreamSource -import org.apache.flink.streaming.api.invokable.SourceInvokable -import org.apache.flink.streaming.api.function.source.FromElementsFunction -import org.apache.flink.streaming.api.function.source.SourceFunction -import scala.collection.JavaConversions._ -import org.apache.flink.util.Collector -import org.apache.flink.api.scala.streaming.StreamingConversions._ - -class StreamExecutionEnvironment(javaEnv: JavaEnv) { - - /** - * Sets the degree of parallelism (DOP) for operations executed through this environment. - * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with - * x parallel instances. This value can be overridden by specific operations using - * [[DataStream.setParallelism]]. - */ - def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = { - javaEnv.setDegreeOfParallelism(degreeOfParallelism) - } - - /** - * Returns the default degree of parallelism for this execution environment. Note that this - * value can be overridden by individual operations using [[DataStream.setParallelism]] - */ - def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism - - /** - * Sets the maximum time frequency (milliseconds) for the flushing of the - * output buffers. By default the output buffers flush frequently to provide - * low latency and to aid smooth developer experience. Setting the parameter - * can result in three logical modes: - * - * <ul> - * <li> - * A positive integer triggers flushing periodically by that integer</li> - * <li> - * 0 triggers flushing after every record thus minimizing latency</li> - * <li> - * -1 triggers flushing only when the output buffer is full thus maximizing - * throughput</li> - * </ul> - * - */ - def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = { - javaEnv.setBufferTimeout(timeoutMillis) - this - } - - /** - * Gets the default buffer timeout set for this environment - */ - def getBufferTimout: Long = javaEnv.getBufferTimeout() - - /** - * Creates a DataStream that represents the Strings produced by reading the - * given file line wise. The file will be read with the system's default - * character set. - * - */ - def readTextFile(filePath: String): DataStream[String] = - javaEnv.readTextFile(filePath) - - /** - * Creates a DataStream that represents the Strings produced by reading the - * given file line wise multiple times(infinite). The file will be read with - * the system's default character set. This functionality can be used for - * testing a topology. - * - */ - def readTextStream(StreamPath: String): DataStream[String] = - javaEnv.readTextStream(StreamPath) - - /** - * Creates a new DataStream that contains the strings received infinitely - * from socket. Received strings are decoded by the system's default - * character set. - * - */ - def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] = - javaEnv.socketTextStream(hostname, port, delimiter) - - /** - * Creates a new DataStream that contains the strings received infinitely - * from socket. Received strings are decoded by the system's default - * character set, uses '\n' as delimiter. - * - */ - def socketTextStream(hostname: String, port: Int): DataStream[String] = - javaEnv.socketTextStream(hostname, port) - - /** - * Creates a new DataStream that contains a sequence of numbers. - * - */ - def generateSequence(from: Long, to: Long): DataStream[Long] = { - new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)). - asInstanceOf[DataStream[Long]] - } - - /** - * Creates a DataStream that contains the given elements. The elements must all be of the - * same type and must be serializable. - * - * * Note that this operation will result in a non-parallel data source, i.e. a data source with - * a degree of parallelism of one. - */ - def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = { - val typeInfo = implicitly[TypeInformation[T]] - fromCollection(data)(implicitly[ClassTag[T]], typeInfo) - } - - /** - * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable - * because the framework may move the elements into the cluster if needed. - * - * Note that this operation will result in a non-parallel data source, i.e. a data source with - * a degree of parallelism of one. - */ - def fromCollection[T: ClassTag: TypeInformation]( - data: Seq[T]): DataStream[T] = { - Validate.notNull(data, "Data must not be null.") - val typeInfo = implicitly[TypeInformation[T]] - - val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions - .asJavaCollection(data)) - - javaEnv.addSource(sourceFunction, typeInfo) - } - - /** - * Create a DataStream using a user defined source function for arbitrary - * source functionality. - * - */ - def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = { - Validate.notNull(function, "Function must not be null.") - val cleanFun = StreamExecutionEnvironment.clean(function) - val typeInfo = implicitly[TypeInformation[T]] - javaEnv.addSource(cleanFun, typeInfo) - } - - /** - * Create a DataStream using a user defined source function for arbitrary - * source functionality. - * - */ - def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = { - Validate.notNull(function, "Function must not be null.") - val sourceFunction = new SourceFunction[T] { - val cleanFun = StreamExecutionEnvironment.clean(function) - override def invoke(out: Collector[T]) { - cleanFun(out) - } - } - addSource(sourceFunction) - } - - /** - * Triggers the program execution. The environment will execute all parts of - * the program that have resulted in a "sink" operation. Sink operations are - * for example printing results or forwarding them to a message queue. - * <p> - * The program execution will be logged and displayed with a generated - * default name. - * - */ - def execute() = javaEnv.execute() - - /** - * Triggers the program execution. The environment will execute all parts of - * the program that have resulted in a "sink" operation. Sink operations are - * for example printing results or forwarding them to a message queue. - * <p> - * The program execution will be logged and displayed with the provided name - * - */ - def execute(jobName: String) = javaEnv.execute(jobName) - -} - -object StreamExecutionEnvironment { - - private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) - f - } - - /** - * Creates an execution environment that represents the context in which the program is - * currently executed. If the program is invoked standalone, this method returns a local - * execution environment. If the program is invoked from within the command line client - * to be submitted to a cluster, this method returns the execution environment of this cluster. - */ - def getExecutionEnvironment: StreamExecutionEnvironment = { - new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment) - } - - /** - * Creates a local execution environment. The local execution environment will run the program in - * a multi-threaded fashion in the same JVM as the environment was created in. The default degree - * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads). - */ - def createLocalEnvironment( - degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): - StreamExecutionEnvironment = { - new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism)) - } - - /** - * Creates a remote execution environment. The remote environment sends (parts of) the program to - * a cluster for execution. Note that all file paths used in the program must be accessible from - * the cluster. The execution will use the cluster's default degree of parallelism, unless the - * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]]. - * - * @param host The host name or address of the master (JobManager), - * where the program should be executed. - * @param port The port of the master (JobManager), where the program should be executed. - * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the - * program uses - * user-defined functions, user-defined input formats, or any libraries, - * those must be - * provided in the JAR files. - */ - def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): - StreamExecutionEnvironment = { - new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)) - } - - /** - * Creates a remote execution environment. The remote environment sends (parts of) the program - * to a cluster for execution. Note that all file paths used in the program must be accessible - * from the cluster. The execution will use the specified degree of parallelism. - * - * @param host The host name or address of the master (JobManager), - * where the program should be executed. - * @param port The port of the master (JobManager), where the program should be executed. - * @param degreeOfParallelism The degree of parallelism to use during the execution. - * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the - * program uses - * user-defined functions, user-defined input formats, or any libraries, - * those must be - * provided in the JAR files. - */ - def createRemoteEnvironment( - host: String, - port: Int, - degreeOfParallelism: Int, - jarFiles: String*): StreamExecutionEnvironment = { - val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*) - javaEnv.setDegreeOfParallelism(degreeOfParallelism) - new StreamExecutionEnvironment(javaEnv) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala deleted file mode 100644 index 8d8a0b0..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming - -import org.apache.flink.api.common.functions.JoinFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.scala.ClosureCleaner -import org.apache.flink.api.scala.typeutils.CaseClassSerializer -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.TemporalOperator -import org.apache.flink.streaming.api.function.co.JoinWindowFunction -import scala.reflect.ClassTag -import org.apache.commons.lang.Validate -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable -import org.apache.flink.streaming.util.keys.KeySelectorUtil -import org.apache.flink.api.java.operators.Keys -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean -import org.apache.flink.api.scala.streaming.StreamingConversions._ - -class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends -TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { - - override def createNextWindowOperator() = { - new StreamJoinOperator.JoinWindow[I1, I2](this) - } -} - -object StreamJoinOperator { - - class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) { - - private[flink] val type1 = op.input1.getType(); - - /** - * Continues a temporal Join transformation by defining - * the fields in the first stream to be used as keys for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where(fields: Int*) = { - new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray,type1),type1)) - } - - /** - * Continues a temporal Join transformation by defining - * the fields in the first stream to be used as keys for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where(firstField: String, otherFields: String*) = - new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray,type1),type1)) - - /** - * Continues a temporal Join transformation by defining - * the keyselector function that will be used to extract keys from the first stream - * for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where[K: TypeInformation](fun: (I1) => K) = { - val keyType = implicitly[TypeInformation[K]] - val keyExtractor = new KeySelector[I1, K] { - val cleanFun = op.input1.clean(fun) - def getKey(in: I1) = cleanFun(in) - } - new JoinPredicate[I1, I2](op, keyExtractor) - } - - } - - class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], - private[flink] val keys1: KeySelector[I1, _]) { - private[flink] var keys2: KeySelector[I2, _] = null - private[flink] val type2 = op.input2.getType(); - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo(fields: Int*): JoinedStream[I1, I2] = { - finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray,type2),type2)) - } - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = - finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray,type2),type2)) - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = { - val keyType = implicitly[TypeInformation[K]] - val keyExtractor = new KeySelector[I2, K] { - val cleanFun = op.input1.clean(fun) - def getKey(in: I2) = cleanFun(in) - } - finish(keyExtractor) - } - - private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = { - this.keys2 = keys2 - new JoinedStream[I1, I2](this, createJoinOperator()) - } - - private def createJoinOperator(): JavaStream[(I1, I2)] = { - - val returnType = new CaseClassTypeInfo[(I1, I2)]( - - classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) { - - override def createSerializer: TypeSerializer[(I1, I2)] = { - val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = types(i).createSerializer - } - - new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { - override def createInstance(fields: Array[AnyRef]) = { - (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) - } - } - } - } - - return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)) - .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), - returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) - } - } - - class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends - DataStream[(I1, I2)](javaStream) { - - private val op = jp.op - - /** - * Sets a wrapper for the joined elements. For each joined pair, the result of the - * udf call will be emitted. - */ - def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { - - val invokable = new CoWindowInvokable[I1, I2, R]( - clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, - op.timeStamp2) - - javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), - invokable) - - javaStream.setType(implicitly[TypeInformation[R]]) - } - } - - private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], - joinFunction: (I1, I2) => R) = { - Validate.notNull(joinFunction, "Join function must not be null.") - - val joinFun = new JoinFunction[I1, I2, R] { - - val cleanFun = jp.op.input1.clean(joinFunction) - - override def join(first: I1, second: I2): R = { - cleanFun(first, second) - } - } - - new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala deleted file mode 100644 index 9aefa04..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamingConversions.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming - -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } -import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } -import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } - -object StreamingConversions { - - implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = - new DataStream[R](javaStream) - - implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = - new WindowedDataStream[R](javaWStream) - - implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = - new SplitDataStream[R](javaStream) - - implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): - ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala deleted file mode 100644 index 2f9c792..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } -import org.apache.flink.api.common.typeinfo.TypeInformation -import scala.reflect.ClassTag -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.streaming.api.invokable.operator.MapInvokable -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator -import org.apache.flink.util.Collector -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.streaming.api.windowing.helper.WindowingHelper -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.streaming.api.invokable.StreamInvokable -import scala.collection.JavaConversions._ -import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase -import org.apache.flink.streaming.api.function.aggregation.SumFunction -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean -import org.apache.flink.api.scala.streaming.StreamingConversions._ - -class WindowedDataStream[T](javaStream: JavaWStream[T]) { - - /** - * Defines the slide size (trigger frequency) for the windowed data stream. - * This controls how often the user defined function will be triggered on - * the window. - */ - def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = - javaStream.every(windowingHelper: _*) - - /** - * Groups the elements of the WindowedDataStream using the given - * field positions. The window sizes (evictions) and slide sizes - * (triggers) will be calculated on the whole stream (in a central fashion), - * but the user defined functions will be applied on a per group basis. - * </br></br> To get windows and triggers on a per group basis apply the - * DataStream.window(...) operator on an already grouped data stream. - * - */ - def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*) - - /** - * Groups the elements of the WindowedDataStream using the given - * field expressions. The window sizes (evictions) and slide sizes - * (triggers) will be calculated on the whole stream (in a central fashion), - * but the user defined functions will be applied on a per group basis. - * </br></br> To get windows and triggers on a per group basis apply the - * DataStream.window(...) operator on an already grouped data stream. - * - */ - def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] = - javaStream.groupBy(firstField +: otherFields.toArray: _*) - - /** - * Groups the elements of the WindowedDataStream using the given - * KeySelector function. The window sizes (evictions) and slide sizes - * (triggers) will be calculated on the whole stream (in a central fashion), - * but the user defined functions will be applied on a per group basis. - * </br></br> To get windows and triggers on a per group basis apply the - * DataStream.window(...) operator on an already grouped data stream. - * - */ - def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = { - - val keyExtractor = new KeySelector[T, K] { - val cleanFun = clean(fun) - def getKey(in: T) = cleanFun(in) - } - javaStream.groupBy(keyExtractor) - } - - /** - * Applies a reduce transformation on the windowed data stream by reducing - * the current window at every trigger. - * - */ - def reduce(reducer: ReduceFunction[T]): DataStream[T] = { - if (reducer == null) { - throw new NullPointerException("Reduce function must not be null.") - } - javaStream.reduce(reducer) - } - - /** - * Applies a reduce transformation on the windowed data stream by reducing - * the current window at every trigger. - * - */ - def reduce(fun: (T, T) => T): DataStream[T] = { - if (fun == null) { - throw new NullPointerException("Reduce function must not be null.") - } - val reducer = new ReduceFunction[T] { - val cleanFun = clean(fun) - def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } - } - reduce(reducer) - } - - /** - * Applies a reduceGroup transformation on the windowed data stream by reducing - * the current window at every trigger. In contrast with the simple binary reduce operator, - * groupReduce exposes the whole window through the Iterable interface. - * </br> - * </br> - * Whenever possible try to use reduce instead of groupReduce for increased efficiency - */ - def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): - DataStream[R] = { - if (reducer == null) { - throw new NullPointerException("GroupReduce function must not be null.") - } - javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]]) - } - - /** - * Applies a reduceGroup transformation on the windowed data stream by reducing - * the current window at every trigger. In contrast with the simple binary reduce operator, - * groupReduce exposes the whole window through the Iterable interface. - * </br> - * </br> - * Whenever possible try to use reduce instead of groupReduce for increased efficiency - */ - def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit): - DataStream[R] = { - if (fun == null) { - throw new NullPointerException("GroupReduce function must not be null.") - } - val reducer = new GroupReduceFunction[T, R] { - val cleanFun = clean(fun) - def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) } - } - reduceGroup(reducer) - } - - /** - * Applies an aggregation that that gives the maximum of the elements in the window at - * the given position. - * - */ - def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) - - /** - * Applies an aggregation that that gives the minimum of the elements in the window at - * the given position. - * - */ - def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) - - /** - * Applies an aggregation that sums the elements in the window at the given position. - * - */ - def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) - - /** - * Applies an aggregation that that gives the maximum element of the window by - * the given position. When equality, returns the first. - * - */ - def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY, - position, first) - - /** - * Applies an aggregation that that gives the minimum element of the window by - * the given position. When equality, returns the first. - * - */ - def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY, - position, first) - - def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): - DataStream[T] = { - - val jStream = javaStream.asInstanceOf[JavaWStream[Product]] - val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] - - val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) - - val reducer = aggregationType match { - case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( - outType.getTypeAt(position).getTypeClass())); - case _ => new agg.ProductComparableAggregator(aggregationType, first) - } - - new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]] - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala deleted file mode 100644 index b7d1546..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming.windowing - -import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } -import org.apache.commons.lang.Validate -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction - -object Delta { - - /** - * Creates a delta helper representing a delta trigger or eviction policy. - * </br></br> This policy calculates a delta between the data point which - * triggered last and the currently arrived data point. It triggers if the - * delta is higher than a specified threshold. </br></br> In case it gets - * used for eviction, this policy starts from the first element of the - * buffer and removes all elements from the buffer which have a higher delta - * then the threshold. As soon as there is an element with a lower delta, - * the eviction stops. - */ - def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = { - Validate.notNull(deltaFunction, "Delta function must not be null") - val df = new DeltaFunction[T] { - val cleanFun = clean(deltaFunction) - override def getDelta(first: T, second: T) = cleanFun(first, second) - } - JavaDelta.of(threshold, df, initVal) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8183c8c3/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala deleted file mode 100644 index 62a47c2..0000000 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.streaming.windowing - -import java.util.concurrent.TimeUnit -import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } -import org.apache.flink.api.scala.ClosureCleaner -import org.apache.commons.net.ntp.TimeStamp -import org.apache.flink.streaming.api.windowing.helper.Timestamp -import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean -import org.apache.commons.lang.Validate - -object Time { - - /** - * Creates a helper representing a time trigger which triggers every given - * length (slide size) or a time eviction which evicts all elements older - * than length (window size) using System time. - * - */ - def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] = - JavaTime.of(windowSize, timeUnit) - - /** - * Creates a helper representing a time trigger which triggers every given - * length (slide size) or a time eviction which evicts all elements older - * than length (window size) using a user defined timestamp extractor. - * - */ - def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = { - Validate.notNull(timestamp, "Timestamp must not be null.") - val ts = new Timestamp[R] { - val fun = clean(timestamp, true) - override def getTimestamp(in: R) = fun(in) - } - JavaTime.of(windowSize, ts, startTime) - } - -}
