http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala new file mode 100644 index 0000000..b7d1546 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming.windowing + +import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } +import org.apache.commons.lang.Validate +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction + +object Delta { + + /** + * Creates a delta helper representing a delta trigger or eviction policy. + * </br></br> This policy calculates a delta between the data point which + * triggered last and the currently arrived data point. It triggers if the + * delta is higher than a specified threshold. </br></br> In case it gets + * used for eviction, this policy starts from the first element of the + * buffer and removes all elements from the buffer which have a higher delta + * then the threshold. As soon as there is an element with a lower delta, + * the eviction stops. + */ + def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = { + Validate.notNull(deltaFunction, "Delta function must not be null") + val df = new DeltaFunction[T] { + val cleanFun = clean(deltaFunction) + override def getDelta(first: T, second: T) = cleanFun(first, second) + } + JavaDelta.of(threshold, df, initVal) + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala new file mode 100644 index 0000000..62a47c2 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming.windowing + +import java.util.concurrent.TimeUnit +import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.commons.net.ntp.TimeStamp +import org.apache.flink.streaming.api.windowing.helper.Timestamp +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean +import org.apache.commons.lang.Validate + +object Time { + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using System time. + * + */ + def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] = + JavaTime.of(windowSize, timeUnit) + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using a user defined timestamp extractor. + * + */ + def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = { + Validate.notNull(timestamp, "Timestamp must not be null.") + val ts = new Timestamp[R] { + val fun = clean(timestamp, true) + override def getTimestamp(in: R) = fun(in) + } + JavaTime.of(windowSize, ts, startTime) + } + +}
