[streaming] Temporal join and cross rework for consistence and extended features


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1c87d8bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1c87d8bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1c87d8bc

Branch: refs/heads/master
Commit: 1c87d8bcf07ea8a4f6c4bf0d7b39b20aea36b225
Parents: f165c35
Author: Gyula Fora <[email protected]>
Authored: Fri Dec 19 21:05:01 2014 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Jan 2 18:34:38 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     |  55 +--------
 .../api/datastream/StreamCrossOperator.java     |  50 ++++++--
 .../api/datastream/StreamJoinOperator.java      | 123 +++++++++++++------
 .../api/datastream/TemporalOperator.java        | 103 ++++++++++++++++
 .../api/datastream/WindowDBOperator.java        | 103 ----------------
 .../api/function/co/CrossWindowFunction.java    |  21 ++--
 .../api/function/co/JoinWindowFunction.java     |  17 ++-
 .../streaming/api/WindowCrossJoinTest.java      |   2 +-
 .../streaming/examples/join/WindowJoin.java     |   3 +-
 .../flink/api/scala/streaming/DataStream.scala  |   6 +-
 10 files changed, 255 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index c663315..65a6c37 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -17,15 +17,10 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.util.List;
-
-import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,7 +38,6 @@ import 
org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.util.Collector;
 
 /**
  * The ConnectedDataStream represents a stream for two different data types. It
@@ -545,54 +539,7 @@ public class ConnectedDataStream<IN1, IN2> {
                return invokable;
        }
 
-       protected <OUT> SingleOutputStreamOperator<OUT, ?> 
addGeneralWindowCross(
-                       CrossFunction<IN1, IN2, OUT> crossFunction, long 
windowSize, long slideInterval,
-                       TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
-
-               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CrossFunction.class,
-                               crossFunction.getClass(), 2, null, null);
-
-               CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new 
CrossWindowFunction<IN1, IN2, OUT>(
-                               clean(crossFunction));
-
-               return addGeneralWindowCombine(crossWindowFunction, 
outTypeInfo, windowSize, slideInterval,
-                               timestamp1, timestamp2);
-       }
-
-       private static class CrossWindowFunction<IN1, IN2, OUT> implements
-                       CoWindowFunction<IN1, IN2, OUT> {
-
-               private static final long serialVersionUID = 1L;
-
-               private CrossFunction<IN1, IN2, OUT> crossFunction;
-
-               public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> 
crossFunction) {
-                       this.crossFunction = crossFunction;
-               }
-
-               @Override
-               public void coWindow(List<IN1> first, List<IN2> second, 
Collector<OUT> out)
-                               throws Exception {
-                       for (IN1 firstValue : first) {
-                               for (IN2 secondValue : second) {
-                                       
out.collect(crossFunction.cross(firstValue, secondValue));
-                               }
-                       }
-               }
-       }
-
-       protected SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> 
addGeneralWindowJoin(
-                       CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> 
coWindowFunction, long windowSize,
-                       long slideInterval, TimeStamp<IN1> timestamp1, 
TimeStamp<IN2> timestamp2) {
-
-               TypeInformation<Tuple2<IN1, IN2>> outType = new 
TupleTypeInfo<Tuple2<IN1, IN2>>(
-                               getInputType1(), getInputType2());
-
-               return addGeneralWindowCombine(coWindowFunction, outType, 
windowSize, slideInterval,
-                               timestamp1, timestamp2);
-       }
-
-       private <OUT> SingleOutputStreamOperator<OUT, ?> 
addGeneralWindowCombine(
+       protected <OUT> SingleOutputStreamOperator<OUT, ?> 
addGeneralWindowCombine(
                        CoWindowFunction<IN1, IN2, OUT> coWindowFunction, 
TypeInformation<OUT> outTypeInfo,
                        long windowSize, long slideInterval, TimeStamp<IN1> 
timestamp1,
                        TimeStamp<IN2> timestamp2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
index f54f273..c6cba63 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamCrossOperator.java
@@ -19,9 +19,16 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.CrossOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
 
-public class StreamCrossOperator<I1, I2> extends WindowDBOperator<I1, I2, 
StreamCrossOperator.CrossWindow<I1, I2>> {
+public class StreamCrossOperator<I1, I2> extends
+               TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, 
I2>> {
 
        public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> 
input2) {
                super(input1, input2);
@@ -40,12 +47,23 @@ public class StreamCrossOperator<I1, I2> extends 
WindowDBOperator<I1, I2, Stream
                        this.op = operator;
                }
 
+               public <F> F clean(F f) {
+                       if 
(op.input1.getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
+                               ClosureCleaner.clean(f, true);
+                       }
+                       ClosureCleaner.ensureSerializable(f);
+                       return f;
+               }
+
                /**
-                * Finalizes a temporal Cross transformation by applying a 
{@link CrossFunction} to each pair of crossed elements.<br/>
-                * Each CrossFunction call returns exactly one element. 
+                * Finalizes a temporal Cross transformation by applying a
+                * {@link CrossFunction} to each pair of crossed elements.<br/>
+                * Each CrossFunction call returns exactly one element.
                 * 
-                * @param function The CrossFunction that is called for each 
pair of crossed elements.
-                * @return An CrossOperator that represents the crossed result 
DataSet
+                * @param function
+                *            The CrossFunction that is called for each pair of 
crossed
+                *            elements.
+                * @return A CrossOperator that represents the crossed result 
DataStream
                 * 
                 * @see CrossFunction
                 * @see DataSet
@@ -54,15 +72,29 @@ public class StreamCrossOperator<I1, I2> extends 
WindowDBOperator<I1, I2, Stream
                        return createCrossOperator(function);
                }
 
+               /**
+                * Finalizes a temporal Cross transformation by emitting all 
pairs in a
+                * new Tuple2.
+                * 
+                * @return A CrossOperator that represents the crossed result 
DataStream
+                */
+               public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> 
withDefault() {
+                       return createCrossOperator(new 
CrossOperator.DefaultCrossFunction<I1, I2>());
+               }
+
                protected <R> SingleOutputStreamOperator<R, ?> 
createCrossOperator(
                                CrossFunction<I1, I2, R> function) {
 
-                       return 
op.input1.connect(op.input2).addGeneralWindowCross(function, op.windowSize,
-                                       op.slideInterval, op.timeStamp1, 
op.timeStamp2);
+                       TypeInformation<R> outTypeInfo = 
TypeExtractor.getCrossReturnTypes(function,
+                                       op.input1.getType(), 
op.input2.getType());
 
-               }
+                       CrossWindowFunction<I1, I2, R> crossWindowFunction = 
new CrossWindowFunction<I1, I2, R>(
+                                       clean(function));
 
-               // 
----------------------------------------------------------------------------------------
+                       return 
op.input1.connect(op.input2).addGeneralWindowCombine(crossWindowFunction,
+                                       outTypeInfo, op.windowSize, 
op.slideInterval, op.timeStamp1, op.timeStamp2);
+
+               }
 
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
index 89c80ab..3051587 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
 import org.apache.flink.streaming.util.keys.FieldsKeySelector;
 import org.apache.flink.streaming.util.keys.PojoKeySelector;
 
 public class StreamJoinOperator<I1, I2> extends
-               WindowDBOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> 
{
+               TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> 
{
 
        public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) 
{
                super(input1, input2);
@@ -51,7 +54,7 @@ public class StreamJoinOperator<I1, I2> extends
                 * that should be used as join keys.<br/>
                 * <b>Note: Fields can only be selected as join keys on Tuple
                 * DataStreams.</b><br/>
-                *
+                * 
                 * @param fields
                 *            The indexes of the other Tuple fields of the 
first join
                 *            DataStreams that should be used as keys.
@@ -59,8 +62,8 @@ public class StreamJoinOperator<I1, I2> extends
                 *         {@link JoinPredicate#equalTo} to continue the Join.
                 */
                public JoinPredicate<I1, I2> where(int... fields) {
-                       return new JoinPredicate<I1, I2>(op, 
FieldsKeySelector.getSelector(
-                                       op.input1.getType(), fields));
+                       return new JoinPredicate<I1, I2>(op, 
FieldsKeySelector.getSelector(op.input1.getType(),
+                                       fields));
                }
 
                /**
@@ -68,7 +71,7 @@ public class StreamJoinOperator<I1, I2> extends
                 * Defines the fields of the first join {@link DataStream} that 
should
                 * be used as grouping keys. Fields are the names of member 
fields of
                 * the underlying type of the data stream.
-                *
+                * 
                 * @param fields
                 *            The fields of the first join DataStream that 
should be
                 *            used as keys.
@@ -105,12 +108,13 @@ public class StreamJoinOperator<I1, I2> extends
         * Intermediate step of a temporal Join transformation. <br/>
         * To continue the Join transformation, select the join key of the 
second
         * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
-        *
+        * 
         */
        public static class JoinPredicate<I1, I2> {
 
-               private StreamJoinOperator<I1, I2> op;
-               private final KeySelector<I1, ?> keys1;
+               public StreamJoinOperator<I1, I2> op;
+               public KeySelector<I1, ?> keys1;
+               public KeySelector<I2, ?> keys2;
 
                private JoinPredicate(StreamJoinOperator<I1, I2> operator, 
KeySelector<I1, ?> keys1) {
                        this.op = operator;
@@ -124,37 +128,30 @@ public class StreamJoinOperator<I1, I2> extends
                 * <b>Note: Fields can only be selected as join keys on Tuple
                 * DataStreams.</b><br/>
                 * 
-                * The resulting operator wraps each pair of joining elements 
into a
-                * {@link Tuple2}, with the element of the first input being 
the first
-                * field of the tuple and the element of the second input being 
the
-                * second field of the tuple.
-                *
                 * @param fields
                 *            The indexes of the Tuple fields of the second join
                 *            DataStream that should be used as keys.
-                * @return The joined data stream.
+                * @return An incomplete join. Call {@link 
FinalizeStreamJoin#with} or
+                *         {@link FinalizeStreamJoin#withDefault} to complete
                 */
-               public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> 
equalTo(int... fields) {
-                       return 
createJoinOperator(FieldsKeySelector.getSelector(op.input2.getType(),
-                                       fields));
+               public FinalizeStreamJoin<I1, I2> equalTo(int... fields) {
+                       keys2 = 
FieldsKeySelector.getSelector(op.input2.getType(), fields);
+                       return new FinalizeStreamJoin<I1, I2>(this);
                }
 
                /**
                 * Continues a temporal Join transformation and defines the 
fields of
                 * the second join {@link DataStream} that should be used as 
join keys.<br/>
-                *
-                * The resulting operator wraps each pair of joining elements 
into a
-                * {@link Tuple2}, with the element of the first input being 
the first
-                * field of the tuple and the element of the second input being 
the
-                * second field of the tuple.
-                *
+                * 
                 * @param fields
                 *            The fields of the second join DataStream that 
should be
                 *            used as keys.
-                * @return The joined data stream.
+                * @return An incomplete join. Call {@link 
FinalizeStreamJoin#with} or
+                *         {@link FinalizeStreamJoin#withDefault} to complete
                 */
-               public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> 
equalTo(String... fields) {
-                       return createJoinOperator(new 
PojoKeySelector<I2>(op.input2.getType(), fields));
+               public FinalizeStreamJoin<I1, I2> equalTo(String... fields) {
+                       this.keys2 = new 
PojoKeySelector<I2>(op.input2.getType(), fields);
+                       return new FinalizeStreamJoin<I1, I2>(this);
                }
 
                /**
@@ -164,29 +161,75 @@ public class StreamJoinOperator<I1, I2> extends
                 * second DataStream and extracts a single key value on which 
the
                 * DataStream is joined. </br>
                 * 
-                * The resulting operator wraps each pair of joining elements 
into a
-                * {@link Tuple2}, with the element of the first input being 
the first
-                * field of the tuple and the element of the second input being 
the
-                * second field of the tuple.
-                * 
                 * @param keySelector
                 *            The KeySelector function which extracts the key 
values
                 *            from the second DataStream on which it is joined.
+                * @return An incomplete join. Call {@link 
FinalizeStreamJoin#with} or
+                *         {@link FinalizeStreamJoin#withDefault} to complete
+                */
+               public <K> FinalizeStreamJoin<I1, I2> equalTo(KeySelector<I2, 
K> keySelector) {
+                       this.keys2 = keySelector;
+                       return new FinalizeStreamJoin<I1, I2>(this);
+               }
+
+       }
+
+       public static class FinalizeStreamJoin<I1, I2> {
+               private final JoinPredicate<I1, I2> predicate;
+
+               private FinalizeStreamJoin(JoinPredicate<I1, I2> predicate) {
+                       this.predicate = predicate;
+               }
+
+               /**
+                * Completes a stream join. </p> The resulting operator wraps 
each pair
+                * of joining elements into a {@link Tuple2}, with the element 
of the
+                * first input being the first field of the tuple and the 
element of the
+                * second input being the second field of the tuple.
+                * 
+                * @return The joined data stream.
+                */
+               public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> 
withDefault() {
+                       return createJoinOperator(new DefaultJoinFunction<I1, 
I2>());
+               }
+
+               /**
+                * Completes a stream join. </p> The resulting operator wraps 
each pair
+                * of joining elements using the user defined {@link 
JoinFunction}
+                * 
                 * @return The joined data stream.
                 */
-               public <K> SingleOutputStreamOperator<Tuple2<I1, I2>, ?> 
equalTo(
-                               KeySelector<I2, K> keySelector) {
-                       return createJoinOperator(keySelector);
+               public <OUT> SingleOutputStreamOperator<OUT, ?> 
with(JoinFunction<I1, I2, OUT> joinFunction) {
+                       return createJoinOperator(joinFunction);
                }
 
-               protected SingleOutputStreamOperator<Tuple2<I1, I2>, ?> 
createJoinOperator(
-                               KeySelector<I2, ?> keys2) {
+               private <OUT> SingleOutputStreamOperator<OUT, ?> 
createJoinOperator(
+                               JoinFunction<I1, I2, OUT> joinFunction) {
+
+                       JoinWindowFunction<I1, I2, OUT> joinWindowFunction = 
new JoinWindowFunction<I1, I2, OUT>(
+                                       predicate.keys1, predicate.keys2, 
joinFunction);
+
+                       StreamJoinOperator<I1, I2> op = predicate.op;
+
+                       TypeInformation<OUT> outType = 
TypeExtractor.getJoinReturnTypes(joinFunction,
+                                       op.input1.getType(), 
op.input2.getType());
 
-                       JoinWindowFunction<I1, I2> joinWindowFunction = new 
JoinWindowFunction<I1, I2>(keys1,
-                                       keys2);
-                       return 
op.input1.connect(op.input2).addGeneralWindowJoin(joinWindowFunction,
-                                       op.windowSize, op.slideInterval, 
op.timeStamp1, op.timeStamp2);
+                       return 
op.input1.connect(op.input2).addGeneralWindowCombine(joinWindowFunction,
+                                       outType, op.windowSize, 
op.slideInterval, op.timeStamp1, op.timeStamp2);
                }
        }
 
+       public static final class DefaultJoinFunction<T1, T2> implements
+                       JoinFunction<T1, T2, Tuple2<T1, T2>> {
+
+               private static final long serialVersionUID = 1L;
+               private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>();
+
+               @Override
+               public Tuple2<T1, T2> join(T1 first, T2 second) throws 
Exception {
+                       outTuple.f0 = first;
+                       outTuple.f1 = second;
+                       return outTuple;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
new file mode 100644
index 0000000..cd8aabd
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+
+public abstract class TemporalOperator<I1, I2, OP> {
+
+       public final DataStream<I1> input1;
+       public final DataStream<I2> input2;
+
+       public long windowSize;
+       public long slideInterval;
+
+       public TimeStamp<I1> timeStamp1;
+       public TimeStamp<I2> timeStamp2;
+
+       public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
+               if (input1 == null || input2 == null) {
+                       throw new NullPointerException();
+               }
+               this.input1 = input1.copy();
+               this.input2 = input2.copy();
+       }
+
+       /**
+        * Continues a temporal Join transformation.<br/>
+        * Defines the window size on which the two DataStreams will be joined.
+        * 
+        * @param windowSize
+        *            The size of the window in milliseconds.
+        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
+        *         to continue the Join.
+        */
+       public OP onWindow(long windowSize) {
+               return onWindow(windowSize, windowSize);
+       }
+
+       /**
+        * Continues a temporal Join transformation.<br/>
+        * Defines the window size on which the two DataStreams will be joined.
+        * 
+        * @param windowSize
+        *            The size of the window in milliseconds.
+        * @param slideInterval
+        *            The slide size of the window.
+        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
+        *         to continue the Join.
+        */
+       public OP onWindow(long windowSize, long slideInterval) {
+               return onWindow(windowSize, slideInterval, new 
DefaultTimeStamp<I1>(),
+                               new DefaultTimeStamp<I2>());
+       }
+
+       /**
+        * Continues a temporal Join transformation.<br/>
+        * Defines the window size on which the two DataStreams will be joined.
+        * 
+        * @param windowSize
+        *            The size of the window in milliseconds.
+        * @param slideInterval
+        *            The slide size of the window.
+        * @param timeStamp1
+        *            The timestamp used to extract time from the elements of 
the
+        *            first data stream.
+        * @param timeStamp2
+        *            The timestamp used to extract time from the elements of 
the
+        *            second data stream.
+        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
+        *         to continue the Join.
+        */
+       public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> 
timeStamp1,
+                       TimeStamp<I2> timeStamp2) {
+
+               this.windowSize = windowSize;
+               this.slideInterval = slideInterval;
+
+               this.timeStamp1 = timeStamp1;
+               this.timeStamp2 = timeStamp2;
+
+               return createNextWindowOperator();
+       }
+
+       protected abstract OP createNextWindowOperator();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java
deleted file mode 100644
index 4f6f0c1..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDBOperator.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-
-public abstract class WindowDBOperator<I1, I2, OP> {
-
-       protected final DataStream<I1> input1;
-       protected final DataStream<I2> input2;
-
-       long windowSize;
-       long slideInterval;
-
-       TimeStamp<I1> timeStamp1;
-       TimeStamp<I2> timeStamp2;
-
-       public WindowDBOperator(DataStream<I1> input1, DataStream<I2> input2) {
-               if (input1 == null || input2 == null) {
-                       throw new NullPointerException();
-               }
-               this.input1 = input1.copy();
-               this.input2 = input2.copy();
-       }
-
-       /**
-        * Continues a temporal Join transformation.<br/>
-        * Defines the window size on which the two DataStreams will be joined.
-        * 
-        * @param windowSize
-        *            The size of the window in milliseconds.
-        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
-        *         to continue the Join.
-        */
-       public OP onWindow(long windowSize) {
-               return onWindow(windowSize, windowSize);
-       }
-
-       /**
-        * Continues a temporal Join transformation.<br/>
-        * Defines the window size on which the two DataStreams will be joined.
-        * 
-        * @param windowSize
-        *            The size of the window in milliseconds.
-        * @param slideInterval
-        *            The slide size of the window.
-        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
-        *         to continue the Join.
-        */
-       public OP onWindow(long windowSize, long slideInterval) {
-               return onWindow(windowSize, slideInterval, new 
DefaultTimeStamp<I1>(),
-                               new DefaultTimeStamp<I2>());
-       }
-
-       /**
-        * Continues a temporal Join transformation.<br/>
-        * Defines the window size on which the two DataStreams will be joined.
-        * 
-        * @param windowSize
-        *            The size of the window in milliseconds.
-        * @param slideInterval
-        *            The slide size of the window.
-        * @param timeStamp1
-        *            The timestamp used to extract time from the elements of 
the
-        *            first data stream.
-        * @param timeStamp2
-        *            The timestamp used to extract time from the elements of 
the
-        *            second data stream.
-        * @return An incomplete Join transformation. Call {@link 
JoinWindow#where}
-        *         to continue the Join.
-        */
-       public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> 
timeStamp1,
-                       TimeStamp<I2> timeStamp2) {
-
-               this.windowSize = windowSize;
-               this.slideInterval = slideInterval;
-
-               this.timeStamp1 = timeStamp1;
-               this.timeStamp2 = timeStamp2;
-
-               return createNextWindowOperator();
-       }
-       
-       protected abstract OP createNextWindowOperator();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
index f61738a..9cafcd1 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java
@@ -20,20 +20,25 @@ package org.apache.flink.streaming.api.function.co;
 
 import java.util.List;
 
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.util.Collector;
 
-public class CrossWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, 
IN2, Tuple2<IN1, IN2>> {
+public class CrossWindowFunction<IN1, IN2, OUT> implements 
CoWindowFunction<IN1, IN2, OUT> {
        private static final long serialVersionUID = 1L;
 
-       @Override
-       public void coWindow(List<IN1> first, List<IN2> second, 
Collector<Tuple2<IN1, IN2>> out)
-                       throws Exception {
+       private CrossFunction<IN1, IN2, OUT> crossFunction;
+
+       public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
+               this.crossFunction = crossFunction;
+       }
 
-               for (IN1 item1 : first) {
-                       for (IN2 item2 : second) {
-                               out.collect(new Tuple2<IN1, IN2>(item1, item2));
+       @Override
+       public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> 
out) throws Exception {
+               for (IN1 firstValue : first) {
+                       for (IN2 secondValue : second) {
+                               out.collect(crossFunction.cross(firstValue, 
secondValue));
                        }
                }
        }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
index 53e2657..9f5cd5d 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
@@ -20,31 +20,30 @@ package org.apache.flink.streaming.api.function.co;
 
 import java.util.List;
 
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
-public class JoinWindowFunction<IN1, IN2> implements CoWindowFunction<IN1, 
IN2, Tuple2<IN1, IN2>> {
+public class JoinWindowFunction<IN1, IN2, OUT> implements 
CoWindowFunction<IN1, IN2, OUT> {
        private static final long serialVersionUID = 1L;
 
        private KeySelector<IN1, ?> keySelector1;
        private KeySelector<IN2, ?> keySelector2;
+       private JoinFunction<IN1, IN2, OUT> joinFunction;
 
-       public JoinWindowFunction() {
-       }
-
-       public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, 
KeySelector<IN2, ?> keySelector2) {
+       public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, 
KeySelector<IN2, ?> keySelector2,
+                       JoinFunction<IN1, IN2, OUT> joinFunction) {
                this.keySelector1 = keySelector1;
                this.keySelector2 = keySelector2;
+               this.joinFunction = joinFunction;
        }
 
        @Override
-       public void coWindow(List<IN1> first, List<IN2> second, 
Collector<Tuple2<IN1, IN2>> out)
-                       throws Exception {
+       public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> 
out) throws Exception {
                for (IN1 item1 : first) {
                        for (IN2 item2 : second) {
                                if 
(keySelector1.getKey(item1).equals(keySelector2.getKey(item2))) {
-                                       out.collect(new Tuple2<IN1, IN2>(item1, 
item2));
+                                       out.collect(joinFunction.join(item1, 
item2));
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index 37f8c0a..07d40ff 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -96,7 +96,7 @@ public class WindowCrossJoinTest implements Serializable {
                DataStream<Integer> inStream2 = env.fromCollection(in2);
 
                inStream1.join(inStream2).onWindow(1000, 1000, new 
MyTimestamp1(), new MyTimestamp2())
-                               .where(0).equalTo(0).addSink(new 
JoinResultSink());
+                               .where(0).equalTo(0).withDefault().addSink(new 
JoinResultSink());
 
                inStream1.cross(inStream2).onWindow(1000, 1000, new 
MyTimestamp1(), new MyTimestamp2())
                                .with(new CrossFunction<Tuple2<Integer,String>, 
Integer, Tuple2<Tuple2<Integer,String>, Integer>>() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 93df823..2586e3c 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -67,7 +67,8 @@ public class WindowJoin {
                                .join(salaries)
                                .onWindow(1000)
                                .where(0)
-                               .equalTo(0);
+                               .equalTo(0)
+                               .withDefault();
 
                // emit result
                if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1c87d8bc/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 5f46c84..a117412 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -284,7 +284,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * received records.
    *
    */
-  def count: DataStream[java.lang.Long] = new 
DataStream[java.lang.Long](javaStream.count())
+  def count: DataStream[Long] = new 
DataStream[java.lang.Long](javaStream.count()).asInstanceOf[DataStream[Long]]
 
   /**
    * Creates a new DataStream by applying the given function to every element 
of this DataStream.
@@ -445,14 +445,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Creates a new SplitDataStream that contains only the elements satisfying 
the
    *  given output selector predicate.
    */
-  def split(fun: T => TraversableOnce[String]): SplitDataStream[T] = {
+  def split(fun: T => String): SplitDataStream[T] = {
     if (fun == null) {
       throw new NullPointerException("OutputSelector must not be null.")
     }
     val selector = new OutputSelector[T] {
       val cleanFun = clean(fun)
       def select(in: T): java.lang.Iterable[String] = {
-        asJavaIterable(cleanFun(in).toIterable)
+        List(cleanFun(in))
       }
     }
     split(selector)

Reply via email to