[streaming] Temporal operator windowing syntax update

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b0a2e4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b0a2e4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b0a2e4a1

Branch: refs/heads/master
Commit: b0a2e4a14e0161e753ec3296d4ead8e46ac8e303
Parents: 92ceacd
Author: Gyula Fora <[email protected]>
Authored: Mon Jan 5 15:24:59 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Tue Jan 6 15:09:04 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |   4 +
 .../streaming/api/datastream/DataStream.java    |   4 +-
 .../api/datastream/DataStreamSink.java          |   2 +-
 .../api/datastream/GroupedDataStream.java       |   2 +-
 .../api/datastream/IterativeDataStream.java     |   2 +-
 .../datastream/SingleOutputStreamOperator.java  |   2 +-
 .../api/datastream/StreamCrossOperator.java     |  86 ------
 .../api/datastream/StreamJoinOperator.java      | 257 ------------------
 .../api/datastream/TemporalOperator.java        | 102 -------
 .../temporaloperator/StreamCrossOperator.java   | 101 +++++++
 .../temporaloperator/StreamJoinOperator.java    | 272 +++++++++++++++++++
 .../temporaloperator/TemporalOperator.java      | 124 +++++++++
 .../temporaloperator/TemporalWindow.java        |  45 +++
 .../operator/co/CoWindowInvokable.java          |   4 +
 .../streaming/api/windowing/helper/Time.java    |  20 +-
 .../streaming/api/WindowCrossJoinTest.java      |  21 +-
 .../streaming/examples/join/WindowJoin.java     |   3 +-
 .../scala/examples/windowing/WindowJoin.scala   |   7 +-
 .../api/scala/StreamCrossOperator.scala         |  17 +-
 .../api/scala/StreamJoinOperator.scala          |  47 ++--
 .../streaming/api/scala/TemporalOperator.scala  |  43 +++
 21 files changed, 662 insertions(+), 503 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index c826274..441360c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -509,6 +509,10 @@ public class JobGraphBuilder {
                invokableObjects.put(id, invokableObject);
        }
 
+       public StreamInvokable<?, ?> getInvokable(String id) {
+               return invokableObjects.get(id);
+       }
+
        public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
                StreamRecordSerializer<OUT> serializer = new 
StreamRecordSerializer<OUT>(outType);
                typeSerializersOut1.put(id, serializer);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8d0020e..e969647 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -38,6 +38,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.JobGraphBuilder;
+import 
org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
+import 
org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
@@ -1180,7 +1182,7 @@ public class DataStream<OUT> {
         * 
         * @return The copy
         */
-       protected DataStream<OUT> copy() {
+       public DataStream<OUT> copy() {
                return new DataStream<OUT>(this);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 369c3eb..61fc557 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -38,7 +38,7 @@ public class DataStreamSink<IN> extends 
SingleOutputStreamOperator<IN, DataStrea
        }
 
        @Override
-       protected DataStreamSink<IN> copy() {
+       public DataStreamSink<IN> copy() {
                throw new RuntimeException("Data stream sinks cannot be 
copied");
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 160ef8d..4a5c0c2 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -199,7 +199,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> 
{
        }
 
        @Override
-       protected GroupedDataStream<OUT> copy() {
+       public GroupedDataStream<OUT> copy() {
                return new GroupedDataStream<OUT>(this);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 78518c0..d7467d1 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -72,7 +72,7 @@ public class IterativeDataStream<IN> extends
        }
 
        @Override
-       protected IterativeDataStream<IN> copy() {
+       public IterativeDataStream<IN> copy() {
                return new IterativeDataStream<IN>(this, iterationID, waitTime);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index c19517b..4b6edc0 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -186,7 +186,7 @@ public class SingleOutputStreamOperator<OUT, O extends 
SingleOutputStreamOperato
        }
 
        @Override
-       protected SingleOutputStreamOperator<OUT, O> copy() {
+       public SingleOutputStreamOperator<OUT, O> copy() {
                return new SingleOutputStreamOperator<OUT, O>(this);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
deleted file mode 100644
index 2dd7bc0..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.CrossOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-
-public class StreamCrossOperator<I1, I2> extends
-               TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, 
I2>> {
-
-       public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> 
input2) {
-               super(input1, input2);
-       }
-
-       @Override
-       protected CrossWindow<I1, I2> createNextWindowOperator() {
-
-               CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction 
= new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
-                               input1.clean(new 
CrossOperator.DefaultCrossFunction<I1, I2>()));
-
-               return new CrossWindow<I1, I2>(this, 
input1.connect(input2).addGeneralWindowCombine(
-                               crossWindowFunction,
-                               new TupleTypeInfo<Tuple2<I1, 
I2>>(input1.getType(), input2.getType()), windowSize,
-                               slideInterval, timeStamp1, timeStamp2));
-       }
-
-       public static class CrossWindow<I1, I2> extends
-                       SingleOutputStreamOperator<Tuple2<I1, I2>, 
CrossWindow<I1, I2>> {
-
-               private StreamCrossOperator<I1, I2> op;
-
-               public CrossWindow(StreamCrossOperator<I1, I2> op, 
DataStream<Tuple2<I1, I2>> ds) {
-                       super(ds);
-                       this.op = op;
-               }
-
-               /**
-                * Finalizes a temporal Cross transformation by applying a
-                * {@link CrossFunction} to each pair of crossed elements.<br/>
-                * Each CrossFunction call returns exactly one element.
-                * 
-                * @param function
-                *            The CrossFunction that is called for each pair of 
crossed
-                *            elements.
-                * @return The crossed data streams
-                * 
-                */
-               public <R> SingleOutputStreamOperator<R, ?> 
with(CrossFunction<I1, I2, R> function) {
-                       TypeInformation<R> outTypeInfo = 
TypeExtractor.getCrossReturnTypes(function,
-                                       op.input1.getType(), 
op.input2.getType());
-
-                       CoWindowInvokable<I1, I2, R> invokable = new 
CoWindowInvokable<I1, I2, R>(
-                                       new CrossWindowFunction<I1, I2, 
R>(clean(function)), op.windowSize,
-                                       op.slideInterval, op.timeStamp1, 
op.timeStamp2);
-
-                       jobGraphBuilder.setInvokable(id, invokable);
-
-                       return setType(outTypeInfo);
-
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
deleted file mode 100644
index de15515..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-public class StreamJoinOperator<I1, I2> extends
-               TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> 
{
-
-       public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) 
{
-               super(input1, input2);
-       }
-
-       @Override
-       protected JoinWindow<I1, I2> createNextWindowOperator() {
-               return new JoinWindow<I1, I2>(this);
-       }
-
-       public static class JoinWindow<I1, I2> {
-
-               private StreamJoinOperator<I1, I2> op;
-               private TypeInformation<I1> type1;
-
-               private JoinWindow(StreamJoinOperator<I1, I2> operator) {
-                       this.op = operator;
-                       this.type1 = op.input1.getType();
-               }
-
-               /**
-                * Continues a temporal Join transformation. <br/>
-                * Defines the {@link Tuple} fields of the first join {@link 
DataStream}
-                * that should be used as join keys.<br/>
-                * <b>Note: Fields can only be selected as join keys on Tuple
-                * DataStreams.</b><br/>
-                * 
-                * @param fields
-                *            The indexes of the other Tuple fields of the 
first join
-                *            DataStreams that should be used as keys.
-                * @return An incomplete Join transformation. Call
-                *         {@link JoinPredicate#equalTo} to continue the Join.
-                */
-               public JoinPredicate<I1, I2> where(int... fields) {
-                       return new JoinPredicate<I1, I2>(op, 
KeySelectorUtil.getSelectorForKeys(
-                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1));
-               }
-
-               /**
-                * Continues a temporal join transformation. <br/>
-                * Defines the fields of the first join {@link DataStream} that 
should
-                * be used as grouping keys. Fields are the names of member 
fields of
-                * the underlying type of the data stream.
-                * 
-                * @param fields
-                *            The fields of the first join DataStream that 
should be
-                *            used as keys.
-                * @return An incomplete Join transformation. Call
-                *         {@link JoinPredicate#equalTo} to continue the Join.
-                */
-               public JoinPredicate<I1, I2> where(String... fields) {
-                       return new JoinPredicate<I1, I2>(op, 
KeySelectorUtil.getSelectorForKeys(
-                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1));
-               }
-
-               /**
-                * Continues a temporal Join transformation and defines a
-                * {@link KeySelector} function for the first join {@link 
DataStream}
-                * .</br> The KeySelector function is called for each element 
of the
-                * first DataStream and extracts a single key value on which the
-                * DataStream is joined. </br>
-                * 
-                * @param keySelector
-                *            The KeySelector function which extracts the key 
values
-                *            from the DataStream on which it is joined.
-                * @return An incomplete Join transformation. Call
-                *         {@link JoinPredicate#equalTo} to continue the Join.
-                */
-               public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> 
keySelector) {
-                       return new JoinPredicate<I1, I2>(op, keySelector);
-               }
-
-               // 
----------------------------------------------------------------------------------------
-
-       }
-
-       /**
-        * Intermediate step of a temporal Join transformation. <br/>
-        * To continue the Join transformation, select the join key of the 
second
-        * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
-        * 
-        */
-       public static class JoinPredicate<I1, I2> {
-
-               private StreamJoinOperator<I1, I2> op;
-               private KeySelector<I1, ?> keys1;
-               private KeySelector<I2, ?> keys2;
-               private TypeInformation<I2> type2;
-
-               private JoinPredicate(StreamJoinOperator<I1, I2> operator, 
KeySelector<I1, ?> keys1) {
-                       this.op = operator;
-                       this.keys1 = keys1;
-                       this.type2 = op.input2.getType();
-               }
-
-               /**
-                * Creates a temporal Join transformation and defines the 
{@link Tuple}
-                * fields of the second join {@link DataStream} that should be 
used as
-                * join keys.<br/>
-                * </p> The resulting operator wraps each pair of joining 
elements in a
-                * Tuple2<I1,I2>(first, second). To use a different wrapping 
function
-                * use {@link JoinedStream#with(JoinFunction)}
-                * 
-                * @param fields
-                *            The indexes of the Tuple fields of the second join
-                *            DataStream that should be used as keys.
-                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
-                *         apply a custom wrapping
-                */
-               public JoinedStream<I1, I2> equalTo(int... fields) {
-                       keys2 = KeySelectorUtil.getSelectorForKeys(new 
Keys.ExpressionKeys<I2>(fields, type2),
-                                       type2);
-                       return createJoinOperator();
-               }
-
-               /**
-                * Creates a temporal Join transformation and defines the 
fields of the
-                * second join {@link DataStream} that should be used as join 
keys. </p>
-                * The resulting operator wraps each pair of joining elements 
in a
-                * Tuple2<I1,I2>(first, second). To use a different wrapping 
function
-                * use {@link JoinedStream#with(JoinFunction)}
-                * 
-                * @param fields
-                *            The fields of the second join DataStream that 
should be
-                *            used as keys.
-                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
-                *         apply a custom wrapping
-                */
-               public JoinedStream<I1, I2> equalTo(String... fields) {
-                       this.keys2 = KeySelectorUtil.getSelectorForKeys(new 
Keys.ExpressionKeys<I2>(fields,
-                                       type2), type2);
-                       return createJoinOperator();
-               }
-
-               /**
-                * Creates a temporal Join transformation and defines a
-                * {@link KeySelector} function for the second join {@link 
DataStream}
-                * .</br> The KeySelector function is called for each element 
of the
-                * second DataStream and extracts a single key value on which 
the
-                * DataStream is joined. </p> The resulting operator wraps each 
pair of
-                * joining elements in a Tuple2<I1,I2>(first, second). To use a
-                * different wrapping function use
-                * {@link JoinedStream#with(JoinFunction)}
-                * 
-                * 
-                * @param keySelector
-                *            The KeySelector function which extracts the key 
values
-                *            from the second DataStream on which it is joined.
-                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
-                *         apply a custom wrapping
-                */
-               public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> 
keySelector) {
-                       this.keys2 = keySelector;
-                       return createJoinOperator();
-               }
-
-               private JoinedStream<I1, I2> createJoinOperator() {
-
-                       JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new 
DefaultJoinFunction<I1, I2>();
-
-                       JoinWindowFunction<I1, I2, Tuple2<I1, I2>> 
joinWindowFunction = getJoinWindowFunction(
-                                       joinFunction, this);
-
-                       TypeInformation<Tuple2<I1, I2>> outType = new 
TupleTypeInfo<Tuple2<I1, I2>>(
-                                       op.input1.getType(), 
op.input2.getType());
-
-                       return new JoinedStream<I1, I2>(this, op.input1
-                                       .groupBy(keys1)
-                                       .connect(op.input2.groupBy(keys2))
-                                       
.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
-                                                       op.slideInterval, 
op.timeStamp1, op.timeStamp2));
-               }
-       }
-
-       public static class JoinedStream<I1, I2> extends
-                       SingleOutputStreamOperator<Tuple2<I1, I2>, 
JoinedStream<I1, I2>> {
-               private final JoinPredicate<I1, I2> predicate;
-
-               private JoinedStream(JoinPredicate<I1, I2> predicate, 
DataStream<Tuple2<I1, I2>> ds) {
-                       super(ds);
-                       this.predicate = predicate;
-               }
-
-               /**
-                * Completes a stream join. </p> The resulting operator wraps 
each pair
-                * of joining elements using the user defined {@link 
JoinFunction}
-                * 
-                * @return The joined data stream.
-                */
-               public <OUT> SingleOutputStreamOperator<OUT, ?> 
with(JoinFunction<I1, I2, OUT> joinFunction) {
-
-                       TypeInformation<OUT> outType = 
TypeExtractor.getJoinReturnTypes(joinFunction,
-                                       predicate.op.input1.getType(), 
predicate.op.input2.getType());
-
-                       CoWindowInvokable<I1, I2, OUT> invokable = new 
CoWindowInvokable<I1, I2, OUT>(
-                                       getJoinWindowFunction(joinFunction, 
predicate), predicate.op.windowSize,
-                                       predicate.op.slideInterval, 
predicate.op.timeStamp1, predicate.op.timeStamp2);
-
-                       jobGraphBuilder.setInvokable(id, invokable);
-
-                       return setType(outType);
-               }
-       }
-
-       public static final class DefaultJoinFunction<I1, I2> implements
-                       JoinFunction<I1, I2, Tuple2<I1, I2>> {
-
-               private static final long serialVersionUID = 1L;
-               private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
-
-               @Override
-               public Tuple2<I1, I2> join(I1 first, I2 second) throws 
Exception {
-                       outTuple.f0 = first;
-                       outTuple.f1 = second;
-                       return outTuple;
-               }
-       }
-
-       public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> 
getJoinWindowFunction(
-                       JoinFunction<I1, I2, OUT> joinFunction, 
JoinPredicate<I1, I2> predicate) {
-               return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, 
predicate.keys2, joinFunction);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
deleted file mode 100644
index e5385f0..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public abstract class TemporalOperator<I1, I2, OP> {
-
-       public final DataStream<I1> input1;
-       public final DataStream<I2> input2;
-
-       public long windowSize;
-       public long slideInterval;
-
-       public TimestampWrapper<I1> timeStamp1;
-       public TimestampWrapper<I2> timeStamp2;
-
-       public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
-               if (input1 == null || input2 == null) {
-                       throw new NullPointerException();
-               }
-               this.input1 = input1.copy();
-               this.input2 = input2.copy();
-       }
-
-       /**
-        * Continues a temporal transformation.<br/>
-        * Defines the window size on which the two DataStreams will be 
transformed.
-        * 
-        * @param windowSize
-        *            The size of the window in milliseconds.
-        * @return An incomplete temporal transformation.
-        */
-       public OP onWindow(long windowSize) {
-               return onWindow(windowSize, windowSize);
-       }
-
-       /**
-        * Continues a temporal transformation.<br/>
-        * Defines the window size on which the two DataStreams will be 
transformed.
-        * 
-        * @param windowSize
-        *            The size of the window in milliseconds.
-        * @param slideInterval
-        *            The slide size of the window.
-        * @return An incomplete temporal transformation.
-        */
-       @SuppressWarnings("unchecked")
-       public OP onWindow(long windowSize, long slideInterval) {
-               return onWindow(windowSize, slideInterval,
-                               (TimestampWrapper<I1>) 
SystemTimestamp.getWrapper(),
-                               (TimestampWrapper<I2>) 
SystemTimestamp.getWrapper());
-       }
-
-       /**
-        * Continues a temporal transformation.<br/>
-        * Defines the window size on which the two DataStreams will be 
transformed.
-        * 
-        * @param windowSize
-        *            The size of the window in milliseconds.
-        * @param slideInterval
-        *            The slide size of the window.
-        * @param timeStamp1
-        *            The timestamp used to extract time from the elements of 
the
-        *            first data stream.
-        * @param timeStamp2
-        *            The timestamp used to extract time from the elements of 
the
-        *            second data stream.
-        * @return An incomplete temporal transformation.
-        */
-       public OP onWindow(long windowSize, long slideInterval, 
TimestampWrapper<I1> timeStamp1,
-                       TimestampWrapper<I2> timeStamp2) {
-
-               this.windowSize = windowSize;
-               this.slideInterval = slideInterval;
-
-               this.timeStamp1 = timeStamp1;
-               this.timeStamp2 = timeStamp2;
-
-               return createNextWindowOperator();
-       }
-
-       protected abstract OP createNextWindowOperator();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
new file mode 100644
index 0000000..8422400
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.CrossOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
+
+public class StreamCrossOperator<I1, I2> extends
+               TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, 
I2>> {
+       
+       public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> 
input2) {
+               super(input1, input2);
+       }
+
+       @Override
+       protected CrossWindow<I1, I2> createNextWindowOperator() {
+
+               CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction 
= new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
+                               input1.clean(new 
CrossOperator.DefaultCrossFunction<I1, I2>()));
+
+               return new CrossWindow<I1, I2>(this, 
input1.connect(input2).addGeneralWindowCombine(
+                               crossWindowFunction,
+                               new TupleTypeInfo<Tuple2<I1, 
I2>>(input1.getType(), input2.getType()), windowSize,
+                               slideInterval, timeStamp1, timeStamp2));
+       }
+
+       public static class CrossWindow<I1, I2> extends
+                       SingleOutputStreamOperator<Tuple2<I1, I2>, 
CrossWindow<I1, I2>> implements
+                       TemporalWindow<CrossWindow<I1, I2>> {
+
+               private StreamCrossOperator<I1, I2> op;
+
+               public CrossWindow(StreamCrossOperator<I1, I2> op, 
DataStream<Tuple2<I1, I2>> ds) {
+                       super(ds);
+                       this.op = op;
+               }
+
+               public CrossWindow<I1, I2> every(long length, TimeUnit 
timeUnit) {
+                       return every(timeUnit.toMillis(length));
+               }
+
+               @SuppressWarnings("unchecked")
+               public CrossWindow<I1, I2> every(long length) {
+                       ((CoWindowInvokable<I1, I2, ?>) 
jobGraphBuilder.getInvokable(id)).setSlideSize(length);
+                       return this;
+               }
+
+               /**
+                * Finalizes a temporal Cross transformation by applying a
+                * {@link CrossFunction} to each pair of crossed elements.<br/>
+                * Each CrossFunction call returns exactly one element.
+                * 
+                * @param function
+                *            The CrossFunction that is called for each pair of 
crossed
+                *            elements.
+                * @return The crossed data streams
+                * 
+                */
+               public <R> SingleOutputStreamOperator<R, ?> 
with(CrossFunction<I1, I2, R> function) {
+                       TypeInformation<R> outTypeInfo = 
TypeExtractor.getCrossReturnTypes(function,
+                                       op.input1.getType(), 
op.input2.getType());
+
+                       CoWindowInvokable<I1, I2, R> invokable = new 
CoWindowInvokable<I1, I2, R>(
+                                       new CrossWindowFunction<I1, I2, 
R>(clean(function)), op.windowSize,
+                                       op.slideInterval, op.timeStamp1, 
op.timeStamp2);
+
+                       jobGraphBuilder.setInvokable(id, invokable);
+
+                       return setType(outTypeInfo);
+
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
new file mode 100644
index 0000000..626b9f1
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
+public class StreamJoinOperator<I1, I2> extends
+               TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> 
{
+
+       public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) 
{
+               super(input1, input2);
+       }
+
+       @Override
+       protected JoinWindow<I1, I2> createNextWindowOperator() {
+               return new JoinWindow<I1, I2>(this);
+       }
+
+       public static class JoinWindow<I1, I2> implements 
TemporalWindow<JoinWindow<I1, I2>> {
+
+               private StreamJoinOperator<I1, I2> op;
+               private TypeInformation<I1> type1;
+
+               private JoinWindow(StreamJoinOperator<I1, I2> operator) {
+                       this.op = operator;
+                       this.type1 = op.input1.getType();
+               }
+
+               /**
+                * Continues a temporal Join transformation. <br/>
+                * Defines the {@link Tuple} fields of the first join {@link 
DataStream}
+                * that should be used as join keys.<br/>
+                * <b>Note: Fields can only be selected as join keys on Tuple
+                * DataStreams.</b><br/>
+                * 
+                * @param fields
+                *            The indexes of the other Tuple fields of the 
first join
+                *            DataStreams that should be used as keys.
+                * @return An incomplete Join transformation. Call
+                *         {@link JoinPredicate#equalTo} to continue the Join.
+                */
+               public JoinPredicate<I1, I2> where(int... fields) {
+                       return new JoinPredicate<I1, I2>(op, 
KeySelectorUtil.getSelectorForKeys(
+                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1));
+               }
+
+               /**
+                * Continues a temporal join transformation. <br/>
+                * Defines the fields of the first join {@link DataStream} that 
should
+                * be used as grouping keys. Fields are the names of member 
fields of
+                * the underlying type of the data stream.
+                * 
+                * @param fields
+                *            The fields of the first join DataStream that 
should be
+                *            used as keys.
+                * @return An incomplete Join transformation. Call
+                *         {@link JoinPredicate#equalTo} to continue the Join.
+                */
+               public JoinPredicate<I1, I2> where(String... fields) {
+                       return new JoinPredicate<I1, I2>(op, 
KeySelectorUtil.getSelectorForKeys(
+                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1));
+               }
+
+               /**
+                * Continues a temporal Join transformation and defines a
+                * {@link KeySelector} function for the first join {@link 
DataStream}
+                * .</br> The KeySelector function is called for each element 
of the
+                * first DataStream and extracts a single key value on which the
+                * DataStream is joined. </br>
+                * 
+                * @param keySelector
+                *            The KeySelector function which extracts the key 
values
+                *            from the DataStream on which it is joined.
+                * @return An incomplete Join transformation. Call
+                *         {@link JoinPredicate#equalTo} to continue the Join.
+                */
+               public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> 
keySelector) {
+                       return new JoinPredicate<I1, I2>(op, keySelector);
+               }
+
+               @Override
+               public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) 
{
+                       return every(timeUnit.toMillis(length));
+               }
+
+               @Override
+               public JoinWindow<I1, I2> every(long length) {
+                       op.slideInterval = length;
+                       return this;
+               }
+
+               // 
----------------------------------------------------------------------------------------
+
+       }
+
+       /**
+        * Intermediate step of a temporal Join transformation. <br/>
+        * To continue the Join transformation, select the join key of the 
second
+        * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
+        * 
+        */
+       public static class JoinPredicate<I1, I2> {
+
+               private StreamJoinOperator<I1, I2> op;
+               private KeySelector<I1, ?> keys1;
+               private KeySelector<I2, ?> keys2;
+               private TypeInformation<I2> type2;
+
+               private JoinPredicate(StreamJoinOperator<I1, I2> operator, 
KeySelector<I1, ?> keys1) {
+                       this.op = operator;
+                       this.keys1 = keys1;
+                       this.type2 = op.input2.getType();
+               }
+
+               /**
+                * Creates a temporal Join transformation and defines the 
{@link Tuple}
+                * fields of the second join {@link DataStream} that should be 
used as
+                * join keys.<br/>
+                * </p> The resulting operator wraps each pair of joining 
elements in a
+                * Tuple2<I1,I2>(first, second). To use a different wrapping 
function
+                * use {@link JoinedStream#with(JoinFunction)}
+                * 
+                * @param fields
+                *            The indexes of the Tuple fields of the second join
+                *            DataStream that should be used as keys.
+                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
+                *         apply a custom wrapping
+                */
+               public JoinedStream<I1, I2> equalTo(int... fields) {
+                       keys2 = KeySelectorUtil.getSelectorForKeys(new 
Keys.ExpressionKeys<I2>(fields, type2),
+                                       type2);
+                       return createJoinOperator();
+               }
+
+               /**
+                * Creates a temporal Join transformation and defines the 
fields of the
+                * second join {@link DataStream} that should be used as join 
keys. </p>
+                * The resulting operator wraps each pair of joining elements 
in a
+                * Tuple2<I1,I2>(first, second). To use a different wrapping 
function
+                * use {@link JoinedStream#with(JoinFunction)}
+                * 
+                * @param fields
+                *            The fields of the second join DataStream that 
should be
+                *            used as keys.
+                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
+                *         apply a custom wrapping
+                */
+               public JoinedStream<I1, I2> equalTo(String... fields) {
+                       this.keys2 = KeySelectorUtil.getSelectorForKeys(new 
Keys.ExpressionKeys<I2>(fields,
+                                       type2), type2);
+                       return createJoinOperator();
+               }
+
+               /**
+                * Creates a temporal Join transformation and defines a
+                * {@link KeySelector} function for the second join {@link 
DataStream}
+                * .</br> The KeySelector function is called for each element 
of the
+                * second DataStream and extracts a single key value on which 
the
+                * DataStream is joined. </p> The resulting operator wraps each 
pair of
+                * joining elements in a Tuple2<I1,I2>(first, second). To use a
+                * different wrapping function use
+                * {@link JoinedStream#with(JoinFunction)}
+                * 
+                * 
+                * @param keySelector
+                *            The KeySelector function which extracts the key 
values
+                *            from the second DataStream on which it is joined.
+                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
+                *         apply a custom wrapping
+                */
+               public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> 
keySelector) {
+                       this.keys2 = keySelector;
+                       return createJoinOperator();
+               }
+
+               private JoinedStream<I1, I2> createJoinOperator() {
+
+                       JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new 
DefaultJoinFunction<I1, I2>();
+
+                       JoinWindowFunction<I1, I2, Tuple2<I1, I2>> 
joinWindowFunction = getJoinWindowFunction(
+                                       joinFunction, this);
+
+                       TypeInformation<Tuple2<I1, I2>> outType = new 
TupleTypeInfo<Tuple2<I1, I2>>(
+                                       op.input1.getType(), 
op.input2.getType());
+
+                       return new JoinedStream<I1, I2>(this, op.input1
+                                       .groupBy(keys1)
+                                       .connect(op.input2.groupBy(keys2))
+                                       
.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
+                                                       op.slideInterval, 
op.timeStamp1, op.timeStamp2));
+               }
+       }
+
+       public static class JoinedStream<I1, I2> extends
+                       SingleOutputStreamOperator<Tuple2<I1, I2>, 
JoinedStream<I1, I2>> {
+               private final JoinPredicate<I1, I2> predicate;
+
+               private JoinedStream(JoinPredicate<I1, I2> predicate, 
DataStream<Tuple2<I1, I2>> ds) {
+                       super(ds);
+                       this.predicate = predicate;
+               }
+
+               /**
+                * Completes a stream join. </p> The resulting operator wraps 
each pair
+                * of joining elements using the user defined {@link 
JoinFunction}
+                * 
+                * @return The joined data stream.
+                */
+               public <OUT> SingleOutputStreamOperator<OUT, ?> 
with(JoinFunction<I1, I2, OUT> joinFunction) {
+
+                       TypeInformation<OUT> outType = 
TypeExtractor.getJoinReturnTypes(joinFunction,
+                                       predicate.op.input1.getType(), 
predicate.op.input2.getType());
+
+                       CoWindowInvokable<I1, I2, OUT> invokable = new 
CoWindowInvokable<I1, I2, OUT>(
+                                       getJoinWindowFunction(joinFunction, 
predicate), predicate.op.windowSize,
+                                       predicate.op.slideInterval, 
predicate.op.timeStamp1, predicate.op.timeStamp2);
+
+                       jobGraphBuilder.setInvokable(id, invokable);
+
+                       return setType(outType);
+               }
+       }
+
+       public static final class DefaultJoinFunction<I1, I2> implements
+                       JoinFunction<I1, I2, Tuple2<I1, I2>> {
+
+               private static final long serialVersionUID = 1L;
+               private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
+
+               @Override
+               public Tuple2<I1, I2> join(I1 first, I2 second) throws 
Exception {
+                       outTuple.f0 = first;
+                       outTuple.f1 = second;
+                       return outTuple;
+               }
+       }
+
+       public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> 
getJoinWindowFunction(
+                       JoinFunction<I1, I2, OUT> joinFunction, 
JoinPredicate<I1, I2> predicate) {
+               return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, 
predicate.keys2, joinFunction);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
new file mode 100644
index 0000000..f121dfa
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
+
+       public final DataStream<I1> input1;
+       public final DataStream<I2> input2;
+
+       public long windowSize;
+       public long slideInterval;
+
+       public TimestampWrapper<I1> timeStamp1;
+       public TimestampWrapper<I2> timeStamp2;
+
+       public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
+               if (input1 == null || input2 == null) {
+                       throw new NullPointerException();
+               }
+               this.input1 = input1.copy();
+               this.input2 = input2.copy();
+       }
+
+       /**
+        * Continues a temporal transformation.<br/>
+        * Defines the window size on which the two DataStreams will be 
transformed.
+        * To define sliding windows call {@link TemporalWindow#every} on the
+        * resulting operator.
+        * 
+        * @param length
+        *            The size of the window in milliseconds.
+        * @param timeUnit
+        *            The unit if time to be used
+        * @return An incomplete temporal transformation.
+        */
+       @SuppressWarnings("unchecked")
+       public OP onWindow(long length, TimeUnit timeUnit) {
+               return onWindow(timeUnit.toMillis(length),
+                               (TimestampWrapper<I1>) 
SystemTimestamp.getWrapper(),
+                               (TimestampWrapper<I2>) 
SystemTimestamp.getWrapper());
+       }
+
+       /**
+        * Continues a temporal transformation.<br/>
+        * Defines the window size on which the two DataStreams will be
+        * transformed.To define sliding windows call {@link 
TemporalWindow#every}
+        * on the resulting operator.
+        * 
+        * @param windowSize
+        *            The size of the window in milliseconds.
+        * @param timeStamp1
+        *            The timestamp used to extract time from the elements of 
the
+        *            first data stream.
+        * @param timeStamp2
+        *            The timestamp used to extract time from the elements of 
the
+        *            second data stream.
+        * @return An incomplete temporal transformation.
+        */
+       public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> 
timeStamp2) {
+               return onWindow(length, timeStamp1, timeStamp2, 0);
+       }
+
+       /**
+        * Continues a temporal transformation.<br/>
+        * Defines the window size on which the two DataStreams will be
+        * transformed.To define sliding windows call {@link 
TemporalWindow#every}
+        * on the resulting operator.
+        * 
+        * @param windowSize
+        *            The size of the window in milliseconds.
+        * @param timeStamp1
+        *            The timestamp used to extract time from the elements of 
the
+        *            first data stream.
+        * @param timeStamp2
+        *            The timestamp used to extract time from the elements of 
the
+        *            second data stream.
+        * @param startTime
+        *            The start time to measure the first window
+        * @return An incomplete temporal transformation.
+        */
+       public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> 
timeStamp2,
+                       long startTime) {
+               return onWindow(length, new TimestampWrapper<I1>(timeStamp1, 
startTime),
+                               new TimestampWrapper<I2>(timeStamp2, 
startTime));
+       }
+
+       private OP onWindow(long length, TimestampWrapper<I1> timeStamp1,
+                       TimestampWrapper<I2> timeStamp2) {
+
+               this.windowSize = length;
+               this.slideInterval = length;
+
+               this.timeStamp1 = timeStamp1;
+               this.timeStamp2 = timeStamp2;
+
+               return createNextWindowOperator();
+       }
+
+       protected abstract OP createNextWindowOperator();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
new file mode 100644
index 0000000..8ac1492
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+public interface TemporalWindow<T> {
+
+       /**
+        * Defines the slide interval for this temporal operator
+        * 
+        * @param length
+        *            Length of the window
+        * @param timeUnit
+        *            Unit of time
+        * @return The temporal operator with slide interval specified
+        */
+       public T every(long length, TimeUnit timeUnit);
+
+       /**
+        * Defines the slide interval for this temporal operator
+        * 
+        * @param length
+        *            Length of the window
+        * @return The temporal operator with slide interval specified
+        */
+       public T every(long length);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
index 59552f4..03219b7 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
@@ -190,4 +190,8 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends 
CoInvokable<IN1, IN2, OUT>
        protected void callUserFunction2() throws Exception {
        }
 
+       public void setSlideSize(long slideSize) {
+               this.slideSize = slideSize;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 9dc1c8c..f94eea4 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -35,13 +35,13 @@ import 
org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  */
 public class Time<DATA> implements WindowingHelper<DATA> {
 
-       private long length;
-       private TimeUnit granularity;
-       private TimestampWrapper<DATA> timestampWrapper;
-       private long delay;
+       protected long length;
+       protected TimeUnit granularity;
+       protected TimestampWrapper<DATA> timestampWrapper;
+       protected long delay;
 
        /**
-        * Creates an helper representing a trigger which triggers every given
+        * Creates a helper representing a trigger which triggers every given
         * length or an eviction which evicts all elements older than length.
         * 
         * @param length
@@ -62,7 +62,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
        }
 
        /**
-        * Creates an helper representing a trigger which triggers every given
+        * Creates a helper representing a trigger which triggers every given
         * length or an eviction which evicts all elements older than length.
         * 
         * @param length
@@ -160,11 +160,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
                return this;
        }
 
-       private long granularityInMillis() {
-               if (granularity != null) {
-                       return this.granularity.toMillis(this.length);
-               } else {
-                       return this.length;
-               }
+       protected long granularityInMillis() {
+               return granularity == null ? length : 
granularity.toMillis(length);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index e856f07..3da6b5f 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
@@ -98,14 +98,14 @@ public class WindowCrossJoinTest implements Serializable {
 
                inStream1
                                .join(inStream2)
-                               .onWindow(1000, 1000, new 
MyTimestamp<Tuple2<Integer, String>>(),
-                                               new 
MyTimestamp<Tuple1<Integer>>()).where(0).equalTo(0)
+                               .onWindow(1000, new MyTimestamp<Tuple2<Integer, 
String>>(),
+                                               new 
MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
                                .addSink(new JoinResultSink());
 
                inStream1
                                .cross(inStream2)
-                               .onWindow(1000, 1000, new 
MyTimestamp<Tuple2<Integer, String>>(),
-                                               new 
MyTimestamp<Tuple1<Integer>>())
+                               .onWindow(1000, new MyTimestamp<Tuple2<Integer, 
String>>(),
+                                               new 
MyTimestamp<Tuple1<Integer>>(), 100)
                                .with(new CrossFunction<Tuple2<Integer, 
String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
 
                                        private static final long 
serialVersionUID = 1L;
@@ -123,22 +123,13 @@ public class WindowCrossJoinTest implements Serializable {
                assertEquals(crossExpectedResults, crossResults);
        }
 
-       private static class MyTimestamp<T> extends TimestampWrapper<T> {
-               public MyTimestamp() {
-                       super(null, 0);
-               }
-
+       private static class MyTimestamp<T> implements Timestamp<T> {
                private static final long serialVersionUID = 1L;
 
                @Override
                public long getTimestamp(T value) {
                        return 101L;
                }
-
-               @Override
-               public long getStartTime() {
-                       return 100L;
-               }
        }
 
        private static class JoinResultSink implements

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 897ad48..dcfed50 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.examples.join;
 
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -67,7 +68,7 @@ public class WindowJoin {
                // second windows
                DataStream<Tuple3<String, Integer, Integer>> joinedStream = 
grades
                                                .join(salaries)
-                                               .onWindow(1000)
+                                               .onWindow(1, TimeUnit.SECONDS)
                                                .where(0)
                                                .equalTo(0)
                                                .with(new MyJoinFunction());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
index a19e4b4..e87d4a1 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.util.Collector
 import scala.util.Random
+import java.util.concurrent.TimeUnit
 
 object WindowJoin {
 
@@ -37,10 +38,10 @@ object WindowJoin {
     val names = env.addSource(nameStream _).map(x => Name(x._1, x._2))
     val ages = env.addSource(ageStream _).map(x => Age(x._1, x._2))
 
-    //Join the two input streams by id on the last second and create new 
Person objects
-    //containing both name and age
+    //Join the two input streams by id on the last second every 2 seconds and 
create new 
+    //Person objects containing both name and age
     val joined =
-      names.join(ages).onWindow(1000)
+      names.join(ages).onWindow(1, TimeUnit.SECONDS).every(2, TimeUnit.SECONDS)
                       .where("id").equalTo("id") { (n, a) => Person(n.name, 
a.age) }
 
     joined print

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index e26db62..d620d5e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.scala
 
 import scala.reflect.ClassTag
-
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -28,11 +27,12 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.datastream.TemporalOperator
 import org.apache.flink.streaming.api.function.co.CrossWindowFunction
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.scala.StreamingConversions._
+import 
org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import java.util.concurrent.TimeUnit
 
 class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
   TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
@@ -72,7 +72,7 @@ object StreamCrossOperator {
 
   private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
                                            javaStream: JavaStream[(I1, I2)]) 
extends
-    DataStream[(I1, I2)](javaStream) {
+    DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] {
 
     /**
      * Sets a wrapper for the crossed elements. For each crossed pair, the 
result of the udf
@@ -90,6 +90,17 @@ object StreamCrossOperator {
 
       javaStream.setType(implicitly[TypeInformation[R]])
     }
+    
+    override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] 
= {
+      every(timeUnit.toMillis(length))
+    }
+
+    override def every(length: Long): CrossWindow[I1, I2] = {
+      val builder = javaStream.getExecutionEnvironment().getJobGraphBuilder()
+      val invokable = builder.getInvokable(javaStream.getId())
+      invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length)
+      this
+    }
   }
 
   private[flink] def getCrossWindowFunction[I1, I2, R](op: 
StreamCrossOperator[I1, I2],

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index d587d56..cb79e2a 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -20,25 +20,24 @@ package org.apache.flink.streaming.api.scala
 
 import scala.Array.canBuildFrom
 import scala.reflect.ClassTag
-
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.JoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.datastream.TemporalOperator
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.function.co.JoinWindowFunction
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.scala.StreamingConversions._
+import 
org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
+import 
org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import java.util.concurrent.TimeUnit
 
-class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends 
 TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
 
   override def createNextWindowOperator() = {
@@ -48,10 +47,11 @@ TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, 
I2]](i1, i2) {
 
 object StreamJoinOperator {
 
-  class JoinWindow[I1, I2](private[flink] op: StreamJoinOperator[I1, I2]) {
+  class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) 
extends 
+  TemporalWindow[JoinWindow[I1, I2]] {
 
     private[flink] val type1 = op.input1.getType();
-    
+
     /**
      * Continues a temporal Join transformation by defining
      * the fields in the first stream to be used as keys for the join.
@@ -60,7 +60,7 @@ object StreamJoinOperator {
      */
     def where(fields: Int*) = {
       new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(fields.toArray,type1),type1))
+        new Keys.ExpressionKeys(fields.toArray, type1), type1))
     }
 
     /**
@@ -69,9 +69,9 @@ object StreamJoinOperator {
      * The resulting incomplete join can be completed by 
JoinPredicate.equalTo()
      * to define the second key.
      */
-    def where(firstField: String, otherFields: String*) = 
+    def where(firstField: String, otherFields: String*) =
       new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(firstField +: 
otherFields.toArray,type1),type1))  
+        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), 
type1))
 
     /**
      * Continues a temporal Join transformation by defining
@@ -89,10 +89,19 @@ object StreamJoinOperator {
       new JoinPredicate[I1, I2](op, keyExtractor)
     }
 
+    override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = 
{
+      every(timeUnit.toMillis(length))
+    }
+
+    override def every(length: Long): JoinWindow[I1, I2] = {
+      op.slideInterval = length
+      this
+    }
+
   }
 
   class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, 
I2],
-                              private[flink] val keys1: KeySelector[I1, _]) {
+    private[flink] val keys1: KeySelector[I1, _]) {
     private[flink] var keys2: KeySelector[I2, _] = null
     private[flink] val type2 = op.input2.getType();
 
@@ -104,7 +113,7 @@ object StreamJoinOperator {
      */
     def equalTo(fields: Int*): JoinedStream[I1, I2] = {
       finish(KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(fields.toArray,type2),type2))
+        new Keys.ExpressionKeys(fields.toArray, type2), type2))
     }
 
     /**
@@ -113,9 +122,9 @@ object StreamJoinOperator {
      * (first, second)
      * To define a custom wrapping, use JoinedStream.apply(...)
      */
-    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, 
I2] = 
-     finish(KeySelectorUtil.getSelectorForKeys(
-          new Keys.ExpressionKeys(firstField +: 
otherFields.toArray,type2),type2))
+    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, 
I2] =
+      finish(KeySelectorUtil.getSelectorForKeys(
+        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), 
type2))
 
     /**
      * Creates a temporal join transformation by defining the second join key.
@@ -159,11 +168,11 @@ object StreamJoinOperator {
 
       return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
         .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
-        returnType, op.windowSize, op.slideInterval, op.timeStamp1, 
op.timeStamp2)
+          returnType, op.windowSize, op.slideInterval, op.timeStamp1, 
op.timeStamp2)
     }
   }
 
-  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: 
JavaStream[(I1, I2)]) extends
+  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: 
JavaStream[(I1, I2)]) extends 
   DataStream[(I1, I2)](javaStream) {
 
     private val op = jp.op
@@ -186,7 +195,7 @@ object StreamJoinOperator {
   }
 
   private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, 
I2],
-                                                      joinFunction: (I1, I2) 
=> R) = {
+    joinFunction: (I1, I2) => R) = {
     Validate.notNull(joinFunction, "Join function must not be null.")
 
     val joinFun = new JoinFunction[I1, I2, R] {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b0a2e4a1/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
new file mode 100644
index 0000000..fd3a4a9
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.temporaloperator.{ 
TemporalOperator => JTempOp }
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import 
org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import org.apache.flink.streaming.api.windowing.helper.Timestamp
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._
+
+abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]](
+  i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) {
+
+  def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long 
= 0): OP = {
+    val timeStamp1 = getTS(ts1)
+    val timeStamp2 = getTS(ts2)
+    onWindow(length, timeStamp1, timeStamp2, startTime)
+  }
+
+  def getTS[R](ts: R => Long): Timestamp[R] = {
+    new Timestamp[R] {
+      val cleanFun = clean(ts, true)
+      def getTimestamp(in: R) = cleanFun(in)
+    }
+  }
+
+}

Reply via email to