[streaming] Deleted obsolete parts of the connected stream api

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcd28fcc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcd28fcc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcd28fcc

Branch: refs/heads/release-0.8
Commit: fcd28fccd38a070f69676f48b2038af9c6e5eb4e
Parents: fb86fde
Author: Gyula Fora <[email protected]>
Authored: Mon Jan 5 15:59:50 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Tue Jan 6 00:22:58 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/CoBatchedDataStream.java     | 126 --------
 .../api/datastream/CoWindowDataStream.java      | 112 -------
 .../api/datastream/ConnectedDataStream.java     | 167 ----------
 .../operator/co/CoBatchReduceInvokable.java     | 316 -------------------
 .../co/CoGroupedBatchReduceInvokable.java       |  79 -----
 .../co/CoGroupedWindowReduceInvokable.java      | 158 ----------
 .../operator/co/CoWindowReduceInvokable.java    | 189 -----------
 .../invokable/operator/CoBatchReduceTest.java   | 137 --------
 .../operator/CoGroupedBatchReduceTest.java      | 172 ----------
 .../operator/CoGroupedWindowReduceTest.java     | 211 -------------
 .../invokable/operator/CoWindowReduceTest.java  | 172 ----------
 11 files changed, 1839 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
deleted file mode 100644
index 3b58188..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-
-/**
- * A {@link CoBatchedDataStream} represents a two data stream whose elements 
are
- * batched together in sliding batches. Operation
- * {@link #reduce(ReduceFunction)} can be applied for each batch and the batch
- * is slid afterwards.
- *
- * @param <IN1>
- *            The type of the first input data stream
- * @param <IN2>
- *            The type of the second input data stream
- */
-public class CoBatchedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, 
IN2> {
-
-       protected long batchSize1;
-       protected long batchSize2;
-       protected long slideSize1;
-       protected long slideSize2;
-
-       protected CoBatchedDataStream(DataStream<IN1> dataStream1, 
DataStream<IN2> dataStream2,
-                       long batchSize1, long batchSize2, long slideSize1, long 
slideSize2) {
-               super(dataStream1, dataStream2);
-               this.batchSize1 = batchSize1;
-               this.batchSize2 = batchSize2;
-               this.slideSize1 = slideSize1;
-               this.slideSize2 = slideSize2;
-       }
-
-       protected CoBatchedDataStream(ConnectedDataStream<IN1, IN2> 
coDataStream, long batchSize1,
-                       long batchSize2, long slideSize1, long slideSize2) {
-               super(coDataStream);
-               this.batchSize1 = batchSize1;
-               this.batchSize2 = batchSize2;
-               this.slideSize1 = slideSize1;
-               this.slideSize2 = slideSize2;
-       }
-
-       protected CoBatchedDataStream(CoBatchedDataStream<IN1, IN2> 
coBatchedDataStream) {
-               super(coBatchedDataStream);
-               this.batchSize1 = coBatchedDataStream.batchSize1;
-               this.batchSize2 = coBatchedDataStream.batchSize2;
-               this.slideSize1 = coBatchedDataStream.slideSize1;
-               this.slideSize2 = coBatchedDataStream.slideSize2;
-       }
-
-       /**
-        * Groups the elements of the {@link CoBatchedDataStream} by the given 
key
-        * positions to be used with grouped operators.
-        * 
-        * @param keyPosition1
-        *            The position of the field on which the first input data 
stream
-        *            will be grouped.
-        * @param keyPosition2
-        *            The position of the field on which the second input data
-        *            stream will be grouped.
-        * @return The transformed {@link CoBatchedDataStream}
-        */
-       public CoBatchedDataStream<IN1, IN2> groupBy(int keyPosition1, int 
keyPosition2) {
-               return new CoBatchedDataStream<IN1, 
IN2>(dataStream1.groupBy(keyPosition1),
-                               dataStream2.groupBy(keyPosition2), batchSize1, 
batchSize2, slideSize1, slideSize2);
-       }
-
-       public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] 
keyPositions2) {
-               return new CoBatchedDataStream<IN1, 
IN2>(dataStream1.groupBy(keyPositions1),
-                               dataStream2.groupBy(keyPositions2), batchSize1, 
batchSize2, slideSize1, slideSize2);
-       }
-
-       public ConnectedDataStream<IN1, IN2> groupBy(String field1, String 
field2) {
-               return new CoBatchedDataStream<IN1, 
IN2>(dataStream1.groupBy(field1),
-                               dataStream2.groupBy(field2), batchSize1, 
batchSize2, slideSize1, slideSize2);
-       }
-
-       public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] 
fields2) {
-               return new CoBatchedDataStream<IN1, 
IN2>(dataStream1.groupBy(fields1),
-                               dataStream2.groupBy(fields2), batchSize1, 
batchSize2, slideSize1, slideSize2);
-       }
-
-       public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> 
keySelector1,
-                       KeySelector<IN2, ?> keySelector2) {
-               return new CoBatchedDataStream<IN1, 
IN2>(dataStream1.groupBy(keySelector1),
-                               dataStream2.groupBy(keySelector2), batchSize1, 
batchSize2, slideSize1, slideSize2);
-       }
-
-       @Override
-       protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
-                       CoReduceFunction<IN1, IN2, OUT> coReducer) {
-               CoBatchReduceInvokable<IN1, IN2, OUT> invokable;
-               if (isGrouped) {
-                       invokable = new CoGroupedBatchReduceInvokable<IN1, IN2, 
OUT>(clean(coReducer), batchSize1,
-                                       batchSize2, slideSize1, slideSize2, 
keySelector1, keySelector2);
-               } else {
-                       invokable = new CoBatchReduceInvokable<IN1, IN2, 
OUT>(clean(coReducer), batchSize1,
-                                       batchSize2, slideSize1, slideSize2);
-               }
-               return invokable;
-       }
-
-       protected CoBatchedDataStream<IN1, IN2> copy() {
-               return new CoBatchedDataStream<IN1, IN2>(this);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
deleted file mode 100644
index 9129f9e..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * A {@link CoWindowDataStream} represents two data streams whose elements are
- * batched together into sliding windows. Operation
- * {@link #reduce(CoReduceFunction)} can be applied for each window.
- * 
- * @param <IN1>
- *            The type of the first input data stream
- * @param <IN2>
- *            The type of the second input data stream
- */
-public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, 
IN2> {
-       TimestampWrapper<IN1> timeStamp1;
-       TimestampWrapper<IN2> timeStamp2;
-
-       protected CoWindowDataStream(DataStream<IN1> dataStream1, 
DataStream<IN2> dataStream2,
-                       long windowSize1, long windowSize2, long 
slideInterval1, long slideInterval2,
-                       TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> 
timeStamp2) {
-               super(dataStream1, dataStream2, windowSize1, windowSize2, 
slideInterval1, slideInterval2);
-               this.timeStamp1 = timeStamp1;
-               this.timeStamp2 = timeStamp2;
-       }
-
-       protected CoWindowDataStream(ConnectedDataStream<IN1, IN2> 
coDataStream, long windowSize1,
-                       long windowSize2, long slideInterval1, long 
slideInterval2,
-                       TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> 
timeStamp2) {
-               super(coDataStream, windowSize1, windowSize2, slideInterval1, 
slideInterval2);
-               this.timeStamp1 = timeStamp1;
-               this.timeStamp2 = timeStamp2;
-       }
-
-       protected CoWindowDataStream(CoWindowDataStream<IN1, IN2> 
coWindowDataStream) {
-               super(coWindowDataStream);
-               this.timeStamp1 = coWindowDataStream.timeStamp1;
-               this.timeStamp2 = coWindowDataStream.timeStamp2;
-       }
-
-       public CoWindowDataStream<IN1, IN2> groupBy(int keyPosition1, int 
keyPosition2) {
-               return new CoWindowDataStream<IN1, 
IN2>(dataStream1.groupBy(keyPosition1),
-                               dataStream2.groupBy(keyPosition2), batchSize1, 
batchSize2, slideSize1, slideSize2,
-                               timeStamp1, timeStamp2);
-       }
-
-       public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] 
keyPositions2) {
-               return new CoWindowDataStream<IN1, 
IN2>(dataStream1.groupBy(keyPositions1),
-                               dataStream2.groupBy(keyPositions2), batchSize1, 
batchSize2, slideSize1, slideSize2,
-                               timeStamp1, timeStamp2);
-       }
-
-       public ConnectedDataStream<IN1, IN2> groupBy(String field1, String 
field2) {
-               return new CoWindowDataStream<IN1, 
IN2>(dataStream1.groupBy(field1),
-                               dataStream2.groupBy(field2), batchSize1, 
batchSize2, slideSize1, slideSize2,
-                               timeStamp1, timeStamp2);
-       }
-
-       public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] 
fields2) {
-               return new CoWindowDataStream<IN1, 
IN2>(dataStream1.groupBy(fields1),
-                               dataStream2.groupBy(fields2), batchSize1, 
batchSize2, slideSize1, slideSize2,
-                               timeStamp1, timeStamp2);
-       }
-
-       public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> 
keySelector1,
-                       KeySelector<IN2, ?> keySelector2) {
-               return new CoWindowDataStream<IN1, 
IN2>(dataStream1.groupBy(keySelector1),
-                               dataStream2.groupBy(keySelector2), batchSize1, 
batchSize2, slideSize1, slideSize2,
-                               timeStamp1, timeStamp2);
-       }
-
-       @Override
-       protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
-                       CoReduceFunction<IN1, IN2, OUT> coReducer) {
-               CoWindowReduceInvokable<IN1, IN2, OUT> invokable;
-               if (isGrouped) {
-                       invokable = new CoGroupedWindowReduceInvokable<IN1, 
IN2, OUT>(clean(coReducer),
-                                       batchSize1, batchSize2, slideSize1, 
slideSize2, keySelector1, keySelector2,
-                                       timeStamp1, timeStamp2);
-               } else {
-                       invokable = new CoWindowReduceInvokable<IN1, IN2, 
OUT>(clean(coReducer), batchSize1,
-                                       batchSize2, slideSize1, slideSize2, 
timeStamp1, timeStamp2);
-               }
-               return invokable;
-       }
-
-       protected CoWindowDataStream<IN1, IN2> copy() {
-               return new CoWindowDataStream<IN1, IN2>(this);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index efd9531..a0c8ff8 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -227,173 +227,6 @@ public class ConnectedDataStream<IN1, IN2> {
        }
 
        /**
-        * Batch operation for connected data stream. Collects each input data
-        * stream's elements simultaneously into sliding batches creating a new
-        * {@link CoBatchedDataStream}. Then the user can apply
-        * {@link CoBatchedDataStream#reduce} transformation on the
-        * {@link CoBatchedDataStream}.
-        * 
-        * @param batchSize1
-        *            The number of elements in each batch of the first input 
data
-        *            stream
-        * @param batchSize2
-        *            The number of elements in each batch of the second input 
data
-        *            stream
-        * @param slideSize1
-        *            The number of elements with which the batches of the first
-        *            input data stream are slid by after each transformation.
-        * @param slideSize2
-        *            The number of elements with which the batches of the 
second
-        *            input data stream are slid by after each transformation.
-        * @return The transformed {@link ConnectedDataStream}
-        */
-       public CoBatchedDataStream<IN1, IN2> batch(long batchSize1, long 
batchSize2, long slideSize1,
-                       long slideSize2) {
-               if (batchSize1 < 1 || batchSize2 < 1) {
-                       throw new IllegalArgumentException("Batch size must be 
positive");
-               }
-               if (slideSize1 < 1 || slideSize2 < 1) {
-                       throw new IllegalArgumentException("Slide size must be 
positive");
-               }
-               return new CoBatchedDataStream<IN1, IN2>(this, batchSize1, 
batchSize2, slideSize1,
-                               slideSize2);
-       }
-
-       /**
-        * Batch operation for connected data stream. Collects each input data
-        * stream's elements simultaneously into batches creating a new
-        * {@link CoBatchedDataStream}. Then the user can apply
-        * {@link CoBatchedDataStream#reduce} transformation on the
-        * {@link CoBatchedDataStream}.
-        * 
-        * @param batchSize1
-        *            The number of elements in each batch of the first input 
data
-        *            stream
-        * @param batchSize2
-        *            The number of elements in each batch of the second input 
data
-        *            stream
-        * @return The transformed {@link ConnectedDataStream}
-        */
-       public CoBatchedDataStream<IN1, IN2> batch(long batchSize1, long 
batchSize2) {
-               return batch(batchSize1, batchSize2, batchSize1, batchSize2);
-       }
-
-       /**
-        * Window operation for connected data stream. Collects each input data
-        * stream's elements simultaneously into sliding windows creating a new
-        * {@link CoWindowDataStream}. Then the user can apply
-        * {@link WindowDataStream#reduce} transformation on the
-        * {@link CoWindowDataStream}. The user can implement their own time 
stamps
-        * or use the system time by default.
-        * 
-        * @param windowSize1
-        *            The length of the window of the first input data stream
-        * @param windowSize2
-        *            The length of the window of the second input data stream
-        * @param slideInterval1
-        *            The number of milliseconds with which the windows of the 
first
-        *            input data stream are slid by after each transformation
-        * @param slideInterval2
-        *            The number of milliseconds with which the windows of the
-        *            second input data stream are slid by after each 
transformation
-        * @param timeStamp1
-        *            User defined function for extracting time-stamps from each
-        *            element of the first input data stream
-        * @param timeStamp2
-        *            User defined function for extracting time-stamps from each
-        *            element of the second input data stream
-        * @return The transformed {@link ConnectedDataStream}
-        */
-       public CoWindowDataStream<IN1, IN2> window(long windowSize1, long 
windowSize2,
-                       long slideInterval1, long slideInterval2, 
TimestampWrapper<IN1> timeStamp1,
-                       TimestampWrapper<IN2> timeStamp2) {
-               if (windowSize1 < 1 || windowSize2 < 1) {
-                       throw new IllegalArgumentException("Window size must be 
positive");
-               }
-               if (slideInterval1 < 1 || slideInterval2 < 1) {
-                       throw new IllegalArgumentException("Slide interval must 
be positive");
-               }
-               return new CoWindowDataStream<IN1, IN2>(this, windowSize1, 
windowSize2, slideInterval1,
-                               slideInterval2, timeStamp1, timeStamp2);
-       }
-
-       /**
-        * Window operation for connected data stream. Collects each input data
-        * stream's elements simultaneously into sliding windows creating a new
-        * {@link CoWindowDataStream}. Then the user can apply
-        * {@link WindowDataStream#reduce} transformation on the
-        * {@link CoWindowDataStream}.
-        * 
-        * @param windowSize1
-        *            The length of the window of the first input data stream in
-        *            milliseconds.
-        * @param windowSize2
-        *            The length of the window of the second input data stream 
in
-        *            milliseconds.
-        * @param slideInterval1
-        *            The number of milliseconds with which the windows of the 
first
-        *            input data stream are slid by after each transformation
-        * @param slideInterval2
-        *            The number of milliseconds with which the windows of the
-        *            second input data stream are slid by after each 
transformation
-        * @return The transformed {@link ConnectedDataStream}
-        */
-       @SuppressWarnings("unchecked")
-       public CoWindowDataStream<IN1, IN2> window(long windowSize1, long 
windowSize2,
-                       long slideInterval1, long slideInterval2) {
-               return window(windowSize1, windowSize2, slideInterval1, 
slideInterval2,
-                               (TimestampWrapper<IN1>) 
SystemTimestamp.getWrapper(),
-                               (TimestampWrapper<IN2>) 
SystemTimestamp.getWrapper());
-       }
-
-       /**
-        * Window operation for connected data stream. Collects each input data
-        * stream's elements simultaneously into windows creating a new
-        * {@link CoWindowDataStream}. Then the user can apply
-        * {@link WindowDataStream#reduce} transformation on the
-        * {@link CoWindowDataStream}. The user can implement their own time 
stamps
-        * or use the system time by default.
-        * 
-        * @param windowSize1
-        *            The length of the window of the first input data stream
-        * @param windowSize2
-        *            The length of the window of the second input data stream
-        * @param timeStamp1
-        *            User defined function for extracting time-stamps from each
-        *            element of the first input data stream
-        * @param timeStamp2
-        *            User defined function for extracting time-stamps from each
-        *            element of the second input data stream
-        * @return The transformed {@link ConnectedDataStream}
-        */
-       public CoWindowDataStream<IN1, IN2> window(long windowSize1, long 
windowSize2,
-                       TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> 
timeStamp2) {
-               return window(windowSize1, windowSize2, windowSize1, 
windowSize2, timeStamp1, timeStamp2);
-       }
-
-       /**
-        * Window operation for connected data stream. Collects each input data
-        * stream's elements simultaneously into windows creating a new
-        * {@link CoWindowDataStream}. Then the user can apply
-        * {@link WindowDataStream#reduce} transformation on the
-        * {@link CoWindowDataStream}.
-        * 
-        * @param windowSize1
-        *            The length of the window of the first input data stream in
-        *            milliseconds
-        * @param windowSize2
-        *            The length of the window of the second input data stream 
in
-        *            milliseconds
-        * @return The transformed {@link ConnectedDataStream}
-        */
-       @SuppressWarnings("unchecked")
-       public CoWindowDataStream<IN1, IN2> window(long windowSize1, long 
windowSize2) {
-               return window(windowSize1, windowSize2, windowSize1, 
windowSize2,
-                               (TimestampWrapper<IN1>) 
SystemTimestamp.getWrapper(),
-                               (TimestampWrapper<IN2>) 
SystemTimestamp.getWrapper());
-       }
-
-       /**
         * Applies a CoMap transformation on a {@link ConnectedDataStream} and 
maps
         * the output to a common type. The transformation calls a
         * {@link CoMapFunction#map1} for each element of the first input and

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
deleted file mode 100644
index 4ed49fd..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.state.NullableCircularBuffer;
-
-public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, 
IN2, OUT> {
-
-       private static final long serialVersionUID = 1L;
-       protected CoReduceFunction<IN1, IN2, OUT> coReducer;
-
-       protected long slideSize1;
-       protected long slideSize2;
-       protected long batchSize1;
-       protected long batchSize2;
-       protected int granularity1;
-       protected int granularity2;
-       protected long batchPerSlide1;
-       protected long batchPerSlide2;
-       protected long numberOfBatches1;
-       protected long numberOfBatches2;
-       protected StreamBatch<IN1> batch1;
-       protected StreamBatch<IN2> batch2;
-       protected StreamBatch<IN1> currentBatch1;
-       protected StreamBatch<IN2> currentBatch2;
-
-       public CoBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> 
coReducer, long batchSize1,
-                       long batchSize2, long slideSize1, long slideSize2) {
-               super(coReducer);
-               this.coReducer = coReducer;
-               this.batchSize1 = batchSize1;
-               this.batchSize2 = batchSize2;
-               this.slideSize1 = slideSize1;
-               this.slideSize2 = slideSize2;
-               this.granularity1 = (int) MathUtils.gcd(batchSize1, slideSize1);
-               this.granularity2 = (int) MathUtils.gcd(batchSize2, slideSize2);
-               this.batchPerSlide1 = slideSize1 / granularity1;
-               this.batchPerSlide2 = slideSize2 / granularity2;
-               this.numberOfBatches1 = batchSize1 / granularity1;
-               this.numberOfBatches2 = batchSize2 / granularity2;
-       }
-
-       @Override
-       public void invoke() throws Exception {
-               while (true) {
-                       int next = recordIterator.next(reuse1, reuse2);
-                       if (next == 0) {
-                               reduceLastBatch1();
-                               reduceLastBatch2();
-                               break;
-                       } else if (next == 1) {
-                               handleStream1();
-                               resetReuse1();
-                       } else {
-                               handleStream2();
-                               resetReuse2();
-                       }
-               }
-       }
-
-       @Override
-       protected void handleStream1() throws Exception {
-               StreamBatch<IN1> batch1 = getBatch1(reuse1);
-               reduceToBuffer1(reuse1.getObject(), batch1);
-       }
-
-       @Override
-       protected void handleStream2() throws Exception {
-               StreamBatch<IN2> batch2 = getBatch2(reuse2);
-               reduceToBuffer2(reuse2.getObject(), batch2);
-       }
-
-       protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) {
-               return batch1;
-       }
-
-       protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) {
-               return batch2;
-       }
-
-       protected void reduce1(StreamBatch<IN1> batch) {
-               this.currentBatch1 = batch;
-               callUserFunctionAndLogException1();
-       }
-
-       protected void reduce2(StreamBatch<IN2> batch) {
-               this.currentBatch2 = batch;
-               callUserFunctionAndLogException2();
-       }
-
-       protected void reduceLastBatch1() throws Exception {
-               reduceLastBatch1(batch1);
-       }
-
-       protected void reduceLastBatch2() throws Exception {
-               reduceLastBatch2(batch2);
-       }
-
-       @Override
-       protected void callUserFunction1() throws Exception {
-               Iterator<IN1> reducedIterator = currentBatch1.getIterator();
-               IN1 reduced = null;
-
-               while (reducedIterator.hasNext() && reduced == null) {
-                       reduced = reducedIterator.next();
-               }
-
-               while (reducedIterator.hasNext()) {
-                       IN1 next = reducedIterator.next();
-                       if (next != null) {
-                               reduced = 
coReducer.reduce1(serializer1.copy(reduced), serializer1.copy(next));
-                       }
-               }
-               if (reduced != null) {
-                       
collector.collect(coReducer.map1(serializer1.copy(reduced)));
-               }
-       }
-
-       @Override
-       protected void callUserFunction2() throws Exception {
-               Iterator<IN2> reducedIterator = currentBatch2.getIterator();
-               IN2 reduced = null;
-
-               while (reducedIterator.hasNext() && reduced == null) {
-                       reduced = reducedIterator.next();
-               }
-
-               while (reducedIterator.hasNext()) {
-                       IN2 next = reducedIterator.next();
-                       if (next != null) {
-                               reduced = 
coReducer.reduce2(serializer2.copy(reduced), serializer2.copy(next));
-                       }
-               }
-               if (reduced != null) {
-                       
collector.collect(coReducer.map2(serializer2.copy(reduced)));
-               }
-       }
-
-       @Override
-       public void open(Configuration config) throws Exception {
-               super.open(config);
-               this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1);
-               this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2);
-       }
-
-       public void reduceToBuffer1(IN1 nextValue, StreamBatch<IN1> 
streamBatch) throws Exception {
-
-               if (streamBatch.currentValue != null) {
-                       streamBatch.currentValue = coReducer.reduce1(
-                                       
serializer1.copy(streamBatch.currentValue), serializer1.copy(nextValue));
-               } else {
-                       streamBatch.currentValue = nextValue;
-               }
-
-               streamBatch.counter++;
-
-               if (streamBatch.miniBatchEnd()) {
-                       streamBatch.addToBuffer();
-                       if (streamBatch.batchEnd()) {
-                               reduceBatch1(streamBatch);
-                       }
-               }
-       }
-
-       public void reduceToBuffer2(IN2 nextValue, StreamBatch<IN2> 
streamBatch) throws Exception {
-
-               if (streamBatch.currentValue != null) {
-                       streamBatch.currentValue = coReducer.reduce2(
-                                       
serializer2.copy(streamBatch.currentValue), serializer2.copy(nextValue));
-               } else {
-                       streamBatch.currentValue = nextValue;
-               }
-
-               streamBatch.counter++;
-
-               if (streamBatch.miniBatchEnd()) {
-                       streamBatch.addToBuffer();
-                       if (streamBatch.batchEnd()) {
-                               reduceBatch2(streamBatch);
-                       }
-               }
-       }
-
-       public void reduceLastBatch1(StreamBatch<IN1> streamBatch) throws 
Exception {
-               if (streamBatch.miniBatchInProgress()) {
-                       streamBatch.addToBuffer();
-               }
-
-               if (streamBatch.changed == true && streamBatch.minibatchCounter 
>= 0) {
-                       if (streamBatch.circularBuffer.isFull()) {
-                               for (long i = 0; i < (numberOfBatches1 - 
streamBatch.minibatchCounter); i++) {
-                                       if 
(!streamBatch.circularBuffer.isEmpty()) {
-                                               
streamBatch.circularBuffer.remove();
-                                       }
-                               }
-                       }
-                       if (!streamBatch.circularBuffer.isEmpty()) {
-                               reduce1(streamBatch);
-                       }
-               }
-
-       }
-
-       public void reduceLastBatch2(StreamBatch<IN2> streamBatch) throws 
Exception {
-               if (streamBatch.miniBatchInProgress()) {
-                       streamBatch.addToBuffer();
-               }
-
-               if (streamBatch.changed == true && streamBatch.minibatchCounter 
>= 0) {
-                       if (streamBatch.circularBuffer.isFull()) {
-                               for (long i = 0; i < (numberOfBatches2 - 
streamBatch.minibatchCounter); i++) {
-                                       if 
(!streamBatch.circularBuffer.isEmpty()) {
-                                               
streamBatch.circularBuffer.remove();
-                                       }
-                               }
-                       }
-                       if (!streamBatch.circularBuffer.isEmpty()) {
-                               reduce2(streamBatch);
-                       }
-               }
-
-       }
-
-       public void reduceBatch1(StreamBatch<IN1> streamBatch) {
-               reduce1(streamBatch);
-               streamBatch.changed = false;
-       }
-
-       public void reduceBatch2(StreamBatch<IN2> streamBatch) {
-               reduce2(streamBatch);
-               streamBatch.changed = false;
-       }
-
-       protected class StreamBatch<IN> implements Serializable {
-               private static final long serialVersionUID = 1L;
-
-               protected long counter;
-               protected long minibatchCounter;
-               protected IN currentValue;
-               protected long batchSize;
-               protected long slideSize;
-               protected long granularity;
-               protected long batchPerSlide;
-               protected long numberOfBatches;
-               boolean changed;
-
-               protected NullableCircularBuffer circularBuffer;
-
-               public StreamBatch(long batchSize, long slideSize) {
-                       this.batchSize = batchSize;
-                       this.slideSize = slideSize;
-                       this.granularity = (int) MathUtils.gcd(batchSize, 
slideSize);
-                       this.batchPerSlide = slideSize / granularity;
-                       this.circularBuffer = new NullableCircularBuffer((int) 
(batchSize / granularity));
-                       this.counter = 0;
-                       this.minibatchCounter = 0;
-                       this.currentValue = null;
-                       this.numberOfBatches = batchSize / granularity;
-                       this.changed = false;
-
-               }
-
-               protected void addToBuffer() {
-                       circularBuffer.add(currentValue);
-                       changed = true;
-                       minibatchCounter++;
-                       currentValue = null;
-               }
-
-               protected boolean miniBatchEnd() {
-                       return (counter % granularity) == 0;
-               }
-
-               public boolean batchEnd() {
-                       if (counter == batchSize) {
-                               counter -= slideSize;
-                               minibatchCounter -= batchPerSlide;
-                               return true;
-                       }
-                       return false;
-               }
-
-               public boolean miniBatchInProgress() {
-                       return currentValue != null;
-               }
-
-               @SuppressWarnings("unchecked")
-               public Iterator<IN> getIterator() {
-                       return circularBuffer.iterator();
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
deleted file mode 100644
index 745f507..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-public class CoGroupedBatchReduceInvokable<IN1, IN2, OUT> extends
-               CoBatchReduceInvokable<IN1, IN2, OUT> {
-       private static final long serialVersionUID = 1L;
-       protected KeySelector<IN1, ?> keySelector1;
-       protected KeySelector<IN2, ?> keySelector2;
-       Map<Object, StreamBatch<IN1>> streamBatches1;
-       Map<Object, StreamBatch<IN2>> streamBatches2;
-
-       public CoGroupedBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> 
coReducer,
-                       long batchSize1, long batchSize2, long slideSize1, long 
slideSize2,
-                       KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> 
keySelector2) {
-               super(coReducer, batchSize1, batchSize2, slideSize1, 
slideSize2);
-               this.keySelector1 = keySelector1;
-               this.keySelector2 = keySelector2;
-               this.streamBatches1 = new HashMap<Object, StreamBatch<IN1>>();
-               this.streamBatches2 = new HashMap<Object, StreamBatch<IN2>>();
-       }
-
-       protected void reduceLastBatch1() throws Exception {
-               for (StreamBatch<IN1> batch : streamBatches1.values()) {
-                       reduceLastBatch1(batch);
-               }
-       }
-
-       protected void reduceLastBatch2() throws Exception {
-               for (StreamBatch<IN2> batch : streamBatches2.values()) {
-                       reduceLastBatch2(batch);
-               }
-       }
-
-       @Override
-       protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) {
-               Object key = next.getKey(keySelector1);
-               StreamBatch<IN1> batch = streamBatches1.get(key);
-               if (batch == null) {
-                       batch = new StreamBatch<IN1>(batchSize1, slideSize1);
-                       streamBatches1.put(key, batch);
-               }
-               return batch;
-       }
-
-       @Override
-       protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) {
-               Object key = next.getKey(keySelector2);
-               StreamBatch<IN2> batch = streamBatches2.get(key);
-               if (batch == null) {
-                       batch = new StreamBatch<IN2>(batchSize2, slideSize2);
-                       streamBatches2.put(key, batch);
-               }
-               return batch;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
deleted file mode 100644
index 736239f..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
-               CoWindowReduceInvokable<IN1, IN2, OUT> {
-       private static final long serialVersionUID = 1L;
-       protected KeySelector<IN1, ?> keySelector1;
-       protected KeySelector<IN2, ?> keySelector2;
-       private Map<Object, StreamWindow<IN1>> streamWindows1;
-       private Map<Object, StreamWindow<IN2>> streamWindows2;
-       private long currentMiniBatchCount1 = 0;
-       private long currentMiniBatchCount2 = 0;
-
-       public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> 
coReducer,
-                       long windowSize1, long windowSize2, long 
slideInterval1, long slideInterval2,
-                       KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> 
keySelector2,
-                       TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> 
timestamp2) {
-               super(coReducer, windowSize1, windowSize2, slideInterval1, 
slideInterval2, timestamp1,
-                               timestamp2);
-               this.keySelector1 = keySelector1;
-               this.keySelector2 = keySelector2;
-               this.streamWindows1 = new HashMap<Object, StreamWindow<IN1>>();
-               this.streamWindows2 = new HashMap<Object, StreamWindow<IN2>>();
-       }
-
-       @Override
-       protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) {
-               Object key = next.getKey(keySelector1);
-               StreamWindow<IN1> window = streamWindows1.get(key);
-               if (window == null) {
-                       window = new GroupedStreamWindow<IN1>(batchSize1, 
slideSize1);
-                       window.minibatchCounter = currentMiniBatchCount1;
-                       streamWindows1.put(key, window);
-               }
-               this.window1 = window;
-               return window;
-       }
-
-       @Override
-       protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) {
-               Object key = next.getKey(keySelector2);
-               StreamWindow<IN2> window = streamWindows2.get(key);
-               if (window == null) {
-                       window = new GroupedStreamWindow<IN2>(batchSize2, 
slideSize2);
-                       window.minibatchCounter = currentMiniBatchCount2;
-                       streamWindows2.put(key, window);
-               }
-               this.window2 = window;
-               return window;
-       }
-
-       private void addToAllBuffers1() {
-               for (StreamBatch<IN1> window : streamWindows1.values()) {
-                       window.addToBuffer();
-               }
-       }
-
-       private void addToAllBuffers2() {
-               for (StreamBatch<IN2> window : streamWindows2.values()) {
-                       window.addToBuffer();
-               }
-       }
-
-       private void reduceAllWindows1() {
-               for (StreamBatch<IN1> window : streamWindows1.values()) {
-                       window.minibatchCounter -= batchPerSlide1;
-                       reduceBatch1((StreamBatch<IN1>) window);
-               }
-       }
-
-       private void reduceAllWindows2() {
-               for (StreamBatch<IN2> window : streamWindows2.values()) {
-                       window.minibatchCounter -= batchPerSlide2;
-                       reduceBatch2((StreamBatch<IN2>) window);
-               }
-       }
-
-       @Override
-       protected void reduceLastBatch1() throws Exception {
-               for (StreamBatch<IN1> window : streamWindows1.values()) {
-                       reduceLastBatch1((StreamBatch<IN1>) window);
-               }
-       }
-
-       @Override
-       protected void reduceLastBatch2() throws Exception {
-               for (StreamBatch<IN2> window : streamWindows2.values()) {
-                       reduceLastBatch2((StreamBatch<IN2>) window);
-               }
-       }
-
-       @Override
-       protected synchronized void checkWindowEnd1(long timeStamp, 
StreamWindow<IN1> streamWindow) {
-               nextRecordTime1 = timeStamp;
-
-               while (miniBatchEnd1()) {
-                       addToAllBuffers1();
-                       if (streamWindow.batchEnd()) {
-                               reduceAllWindows1();
-                       }
-               }
-               currentMiniBatchCount1 = streamWindow.minibatchCounter;
-       }
-
-       @Override
-       protected synchronized void checkWindowEnd2(long timeStamp, 
StreamWindow<IN2> streamWindow) {
-               nextRecordTime2 = timeStamp;
-
-               while (miniBatchEnd2()) {
-                       addToAllBuffers2();
-                       if (streamWindow.batchEnd()) {
-                               reduceAllWindows2();
-                       }
-               }
-               currentMiniBatchCount2 = streamWindow.minibatchCounter;
-       }
-
-       protected class GroupedStreamWindow<IN> extends StreamWindow<IN> {
-               private static final long serialVersionUID = 1L;
-
-               public GroupedStreamWindow(long windowSize, long slideInterval) 
{
-                       super(windowSize, slideInterval);
-               }
-
-               @Override
-               public boolean batchEnd() {
-                       if (minibatchCounter == numberOfBatches) {
-                               return true;
-                       }
-                       return false;
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
deleted file mode 100644
index 0c8598f..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public class CoWindowReduceInvokable<IN1, IN2, OUT> extends 
CoBatchReduceInvokable<IN1, IN2, OUT> {
-       private static final long serialVersionUID = 1L;
-       protected long startTime1;
-       protected long startTime2;
-       protected long nextRecordTime1;
-       protected long nextRecordTime2;
-       protected TimestampWrapper<IN1> timestamp1;
-       protected TimestampWrapper<IN2> timestamp2;
-       protected StreamWindow<IN1> window1;
-       protected StreamWindow<IN2> window2;
-
-       public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> 
coReducer, long windowSize1,
-                       long windowSize2, long slideInterval1, long 
slideInterval2,
-                       TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> 
timestamp2) {
-               super(coReducer, windowSize1, windowSize2, slideInterval1, 
slideInterval2);
-               this.timestamp1 = timestamp1;
-               this.timestamp2 = timestamp2;
-               this.startTime1 = timestamp1.getStartTime();
-               this.startTime2 = timestamp2.getStartTime();
-
-       }
-
-       @Override
-       public void open(Configuration config) throws Exception {
-               super.open(config);
-               this.window1 = new StreamWindow<IN1>(batchSize1, slideSize1);
-               this.window2 = new StreamWindow<IN2>(batchSize2, slideSize2);
-               this.batch1 = this.window1;
-               this.batch2 = this.window2;
-               if (timestamp1.isDefaultTimestamp()) {
-                       (new TimeCheck1()).start();
-               }
-               if (timestamp2.isDefaultTimestamp()) {
-                       (new TimeCheck2()).start();
-               }
-       }
-
-       @Override
-       public void reduceToBuffer1(IN1 nextValue, StreamBatch<IN1> 
streamWindow) throws Exception {
-
-               checkWindowEnd1(timestamp1.getTimestamp(nextValue), 
(StreamWindow<IN1>) streamWindow);
-
-               if (streamWindow.currentValue != null) {
-                       streamWindow.currentValue = coReducer.reduce1(
-                                       
serializer1.copy(streamWindow.currentValue), serializer1.copy(nextValue));
-               } else {
-                       streamWindow.currentValue = nextValue;
-               }
-       }
-
-       @Override
-       public void reduceToBuffer2(IN2 nextValue, StreamBatch<IN2> 
streamWindow) throws Exception {
-
-               checkWindowEnd2(timestamp2.getTimestamp(nextValue), 
(StreamWindow<IN2>) streamWindow);
-
-               if (streamWindow.currentValue != null) {
-                       streamWindow.currentValue = coReducer.reduce2(
-                                       
serializer2.copy(streamWindow.currentValue), serializer2.copy(nextValue));
-               } else {
-                       streamWindow.currentValue = nextValue;
-               }
-       }
-
-       protected synchronized void checkWindowEnd1(long timeStamp, 
StreamWindow<IN1> streamWindow) {
-               nextRecordTime1 = timeStamp;
-
-               while (miniBatchEnd1()) {
-                       streamWindow.addToBuffer();
-                       if (streamWindow.batchEnd()) {
-                               reduceBatch1(streamWindow);
-                       }
-               }
-       }
-
-       protected synchronized void checkWindowEnd2(long timeStamp, 
StreamWindow<IN2> streamWindow) {
-               nextRecordTime2 = timeStamp;
-
-               while (miniBatchEnd2()) {
-                       streamWindow.addToBuffer();
-                       if (streamWindow.batchEnd()) {
-                               reduceBatch2(streamWindow);
-                       }
-               }
-       }
-
-       protected boolean miniBatchEnd1() {
-               if (nextRecordTime1 < startTime1 + granularity1) {
-                       return false;
-               } else {
-                       startTime1 += granularity1;
-                       return true;
-               }
-       }
-
-       protected boolean miniBatchEnd2() {
-               if (nextRecordTime2 < startTime2 + granularity2) {
-                       return false;
-               } else {
-                       startTime2 += granularity2;
-                       return true;
-               }
-       }
-
-       @Override
-       public void reduceBatch1(StreamBatch<IN1> streamBatch) {
-               reduce1(streamBatch);
-       }
-
-       @Override
-       public void reduceBatch2(StreamBatch<IN2> streamBatch) {
-               reduce2(streamBatch);
-       }
-
-       protected class StreamWindow<IN> extends StreamBatch<IN> {
-               private static final long serialVersionUID = 1L;
-
-               public StreamWindow(long windowSize, long slideInterval) {
-                       super(windowSize, slideInterval);
-               }
-
-               @Override
-               public boolean batchEnd() {
-                       if (minibatchCounter == numberOfBatches) {
-                               minibatchCounter -= batchPerSlide;
-                               return true;
-                       }
-                       return false;
-               }
-
-       }
-
-       private class TimeCheck1 extends Thread {
-               @Override
-               public void run() {
-                       while (true) {
-                               try {
-                                       Thread.sleep(slideSize1);
-                               } catch (InterruptedException e) {
-                               }
-                               if (isRunning) {
-                                       
checkWindowEnd1(System.currentTimeMillis(), window1);
-                               } else {
-                                       break;
-                               }
-                       }
-               }
-       }
-
-       private class TimeCheck2 extends Thread {
-               @Override
-               public void run() {
-                       while (true) {
-                               try {
-                                       Thread.sleep(slideSize2);
-                               } catch (InterruptedException e) {
-                               }
-                               if (isRunning) {
-                                       
checkWindowEnd2(System.currentTimeMillis(), window2);
-                               } else {
-                                       break;
-                               }
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
deleted file mode 100644
index 1db286c..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoBatchReduceTest {
-
-       private static class MyCoReduceFunction implements 
CoReduceFunction<Integer, String, String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Integer reduce1(Integer value1, Integer value2) {
-                       return value1 + value2;
-               }
-
-               @Override
-               public String reduce2(String value1, String value2) {
-                       return value1 + value2;
-               }
-
-               @Override
-               public String map1(Integer value) {
-                       return value.toString();
-               }
-
-               @Override
-               public String map2(String value) {
-                       return value;
-               }
-       }
-
-       @Test
-       public void coBatchReduceTest1() {
-
-               List<Integer> inputs = new ArrayList<Integer>();
-               for (Integer i = 1; i <= 10; i++) {
-                       inputs.add(i);
-               }
-
-               List<String> inputs2 = new ArrayList<String>();
-               inputs2.add("a");
-               inputs2.add("b");
-               inputs2.add("c");
-               inputs2.add("d");
-               inputs2.add("e");
-               inputs2.add("f");
-               inputs2.add("g");
-               inputs2.add("h");
-               inputs2.add("i");
-
-               CoBatchReduceInvokable<Integer, String, String> invokable = new 
CoBatchReduceInvokable<Integer, String, String>(
-                               new MyCoReduceFunction(), 4L, 3L, 4L, 3L);
-
-               List<String> expected = new ArrayList<String>();
-               expected.add("10");
-               expected.add("26");
-               expected.add("19");
-               expected.add("abc");
-               expected.add("def");
-               expected.add("ghi");
-
-               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs, inputs2);
-
-               Collections.sort(result);
-               Collections.sort(expected);
-
-               assertEquals(expected, result);
-
-       }
-
-       @Test
-       public void coBatchReduceTest2() {
-
-               List<Integer> inputs = new ArrayList<Integer>();
-               for (Integer i = 1; i <= 10; i++) {
-                       inputs.add(i);
-               }
-
-               List<String> inputs2 = new ArrayList<String>();
-               inputs2.add("a");
-               inputs2.add("b");
-               inputs2.add("c");
-               inputs2.add("d");
-               inputs2.add("e");
-               inputs2.add("f");
-               inputs2.add("g");
-               inputs2.add("h");
-               inputs2.add("i");
-
-               CoBatchReduceInvokable<Integer, String, String> invokable = new 
CoBatchReduceInvokable<Integer, String, String>(
-                               new MyCoReduceFunction(), 4L, 3L, 2L, 2L);
-
-               List<String> expected = new ArrayList<String>();
-               expected.add("10");
-               expected.add("18");
-               expected.add("26");
-               expected.add("34");
-               expected.add("abc");
-               expected.add("cde");
-               expected.add("efg");
-               expected.add("ghi");
-
-               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs, inputs2);
-
-               Collections.sort(result);
-               Collections.sort(expected);
-
-               assertEquals(expected, result);
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
deleted file mode 100644
index 1d0732c..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoGroupedBatchReduceTest {
-
-       KeySelector<Tuple2<String, String>, ?> keySelector1 = new 
KeySelector<Tuple2<String, String>, String>() {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String getKey(Tuple2<String, String> value) throws 
Exception {
-                       return value.f0;
-               }
-       };
-
-       KeySelector<Tuple2<String, Integer>, ?> keySelector2 = new 
KeySelector<Tuple2<String, Integer>, String>() {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String getKey(Tuple2<String, Integer> value) throws 
Exception {
-                       return value.f0;
-               }
-       };
-
-       private static class MyCoReduceFunction implements
-                       CoReduceFunction<Tuple2<String, Integer>, 
Tuple2<String, String>, String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> 
value1,
-                               Tuple2<String, Integer> value2) {
-                       return new Tuple2<String, Integer>("a", value1.f1 + 
value2.f1);
-               }
-
-               @Override
-               public Tuple2<String, String> reduce2(Tuple2<String, String> 
value1,
-                               Tuple2<String, String> value2) {
-                       return new Tuple2<String, String>("a", value1.f1 + 
value2.f1);
-               }
-
-               @Override
-               public String map1(Tuple2<String, Integer> value) {
-                       return value.f1.toString();
-               }
-
-               @Override
-               public String map2(Tuple2<String, String> value) {
-                       return value.f1;
-               }
-       }
-
-       @Test
-       public void coGroupedBatchReduceTest1() {
-
-               List<Tuple2<String, Integer>> inputs1 = new 
ArrayList<Tuple2<String, Integer>>();
-               inputs1.add(new Tuple2<String, Integer>("a", 1));
-               inputs1.add(new Tuple2<String, Integer>("a", 2));
-               inputs1.add(new Tuple2<String, Integer>("a", 3));
-               inputs1.add(new Tuple2<String, Integer>("a", 4));
-               inputs1.add(new Tuple2<String, Integer>("a", 5));
-               inputs1.add(new Tuple2<String, Integer>("b", 6));
-               inputs1.add(new Tuple2<String, Integer>("a", 7));
-               inputs1.add(new Tuple2<String, Integer>("b", 8));
-               inputs1.add(new Tuple2<String, Integer>("b", 9));
-               inputs1.add(new Tuple2<String, Integer>("b", 10));
-
-               List<Tuple2<String, String>> inputs2 = new 
ArrayList<Tuple2<String, String>>();
-               inputs2.add(new Tuple2<String, String>("1", "a"));
-               inputs2.add(new Tuple2<String, String>("2", "b"));
-               inputs2.add(new Tuple2<String, String>("1", "c"));
-               inputs2.add(new Tuple2<String, String>("2", "d"));
-               inputs2.add(new Tuple2<String, String>("1", "e"));
-               inputs2.add(new Tuple2<String, String>("2", "f"));
-               inputs2.add(new Tuple2<String, String>("1", "g"));
-               inputs2.add(new Tuple2<String, String>("2", "h"));
-               inputs2.add(new Tuple2<String, String>("1", "i"));
-
-               List<String> expected = new ArrayList<String>();
-               expected.add("10");
-               expected.add("12");
-               expected.add("33");
-               expected.add("ace");
-               expected.add("gi");
-               expected.add("bdf");
-               expected.add("h");
-
-               CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, 
Tuple2<String, String>, String> invokable = new 
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, 
String>(
-                               new MyCoReduceFunction(), 4L, 3L, 4L, 3L, 
keySelector2, keySelector1);
-
-               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs1, inputs2);
-
-               Collections.sort(result);
-               Collections.sort(expected);
-               assertEquals(expected, result);
-       }
-
-       @Test
-       public void coGroupedBatchReduceTest2() {
-
-               List<Tuple2<String, Integer>> inputs1 = new 
ArrayList<Tuple2<String, Integer>>();
-               inputs1.add(new Tuple2<String, Integer>("a", 1));
-               inputs1.add(new Tuple2<String, Integer>("a", 2));
-               inputs1.add(new Tuple2<String, Integer>("a", 3));
-               inputs1.add(new Tuple2<String, Integer>("a", 4));
-               inputs1.add(new Tuple2<String, Integer>("a", 5));
-               inputs1.add(new Tuple2<String, Integer>("b", 6));
-               inputs1.add(new Tuple2<String, Integer>("a", 7));
-               inputs1.add(new Tuple2<String, Integer>("b", 8));
-               inputs1.add(new Tuple2<String, Integer>("b", 9));
-               inputs1.add(new Tuple2<String, Integer>("b", 10));
-
-               List<Tuple2<String, String>> inputs2 = new 
ArrayList<Tuple2<String, String>>();
-               inputs2.add(new Tuple2<String, String>("1", "a"));
-               inputs2.add(new Tuple2<String, String>("2", "b"));
-               inputs2.add(new Tuple2<String, String>("1", "c"));
-               inputs2.add(new Tuple2<String, String>("2", "d"));
-               inputs2.add(new Tuple2<String, String>("1", "e"));
-               inputs2.add(new Tuple2<String, String>("2", "f"));
-               inputs2.add(new Tuple2<String, String>("1", "g"));
-               inputs2.add(new Tuple2<String, String>("2", "h"));
-               inputs2.add(new Tuple2<String, String>("1", "i"));
-
-               List<String> expected = new ArrayList<String>();
-               expected.add("10");
-               expected.add("19");
-               expected.add("33");
-               expected.add("ace");
-               expected.add("egi");
-               expected.add("bdf");
-               expected.add("fh");
-
-               CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, 
Tuple2<String, String>, String> invokable = new 
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, 
String>(
-                               new MyCoReduceFunction(), 4L, 3L, 2L, 2L, 
keySelector2, keySelector1);
-
-               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs1, inputs2);
-
-               Collections.sort(result);
-               Collections.sort(expected);
-               assertEquals(expected, result);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
deleted file mode 100644
index 508366c..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoGroupedWindowReduceTest {
-
-       KeySelector<Tuple2<String, Integer>, ?> keySelector0 = new 
KeySelector<Tuple2<String, Integer>, String>() {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String getKey(Tuple2<String, Integer> value) throws 
Exception {
-                       return value.f0;
-               }
-       };
-
-       KeySelector<Tuple2<String, String>, ?> keySelector1 = new 
KeySelector<Tuple2<String, String>, String>() {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String getKey(Tuple2<String, String> value) throws 
Exception {
-                       return value.f0;
-               }
-       };
-
-       private static class MyCoReduceFunction implements
-                       CoReduceFunction<Tuple2<String, Integer>, 
Tuple2<String, String>, String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> 
value1,
-                               Tuple2<String, Integer> value2) {
-                       return new Tuple2<String, Integer>("a", value1.f1 + 
value2.f1);
-               }
-
-               @Override
-               public Tuple2<String, String> reduce2(Tuple2<String, String> 
value1,
-                               Tuple2<String, String> value2) {
-                       return new Tuple2<String, String>("a", value1.f1 + 
value2.f1);
-               }
-
-               @Override
-               public String map1(Tuple2<String, Integer> value) {
-                       return value.f1.toString();
-               }
-
-               @Override
-               public String map2(Tuple2<String, String> value) {
-                       return value.f1;
-               }
-       }
-
-       public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
-               private static final long serialVersionUID = 1L;
-
-               private Iterator<Long> timestamps;
-               private long start;
-
-               public MyTimeStamp(List<Long> timestamps) {
-                       super(null, 0);
-                       this.timestamps = timestamps.iterator();
-                       this.start = timestamps.get(0);
-               }
-
-               @Override
-               public long getTimestamp(T value) {
-                       long ts = timestamps.next();
-                       return ts;
-               }
-
-               @Override
-               public long getStartTime() {
-                       return start;
-               }
-       }
-
-       @Test
-       public void coGroupedWindowReduceTest1() {
-
-               List<Long> timestamps1 = Arrays.asList(0L, 0L, 1L, 1L, 1L, 1L, 
2L, 4L, 5L, 6L);
-               List<Tuple2<String, Integer>> inputs1 = new 
ArrayList<Tuple2<String, Integer>>();
-               inputs1.add(new Tuple2<String, Integer>("a", 1));
-               inputs1.add(new Tuple2<String, Integer>("a", 2));
-               inputs1.add(new Tuple2<String, Integer>("a", 3));
-               inputs1.add(new Tuple2<String, Integer>("a", 4));
-               inputs1.add(new Tuple2<String, Integer>("a", 5));
-               inputs1.add(new Tuple2<String, Integer>("b", 6));
-               inputs1.add(new Tuple2<String, Integer>("a", 7));
-               inputs1.add(new Tuple2<String, Integer>("b", 8));
-               inputs1.add(new Tuple2<String, Integer>("b", 9));
-               inputs1.add(new Tuple2<String, Integer>("b", 10));
-
-               List<Long> timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 5L, 
5L, 6L, 7L);
-               List<Tuple2<String, String>> inputs2 = new 
ArrayList<Tuple2<String, String>>();
-               inputs2.add(new Tuple2<String, String>("1", "a"));
-               inputs2.add(new Tuple2<String, String>("2", "b"));
-               inputs2.add(new Tuple2<String, String>("1", "c"));
-               inputs2.add(new Tuple2<String, String>("2", "d"));
-               inputs2.add(new Tuple2<String, String>("1", "e"));
-               inputs2.add(new Tuple2<String, String>("2", "f"));
-               inputs2.add(new Tuple2<String, String>("1", "g"));
-               inputs2.add(new Tuple2<String, String>("2", "h"));
-               inputs2.add(new Tuple2<String, String>("1", "i"));
-
-               List<String> expected = new ArrayList<String>();
-               expected.add("6");
-               expected.add("22");
-               expected.add("27");
-               expected.add("ace");
-               expected.add("bd");
-               expected.add("g");
-               expected.add("fh");
-               expected.add("i");
-
-               CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, 
Tuple2<String, String>, String> invokable = new 
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, 
String>(
-                               new MyCoReduceFunction(), 4L, 3L, 4L, 3L, 
keySelector0, keySelector1,
-                               new MyTimeStamp<Tuple2<String, 
Integer>>(timestamps1),
-                               new MyTimeStamp<Tuple2<String, 
String>>(timestamps2));
-
-               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs1, inputs2);
-
-               Collections.sort(result);
-               Collections.sort(expected);
-               assertEquals(expected, result);
-       }
-
-       @Test
-       public void coGroupedWindowReduceTest2() {
-
-               List<Long> timestamps1 = Arrays.asList(0L, 0L, 1L, 2L, 2L, 3L, 
4L, 4L, 5L, 6L);
-               List<Tuple2<String, Integer>> inputs1 = new 
ArrayList<Tuple2<String, Integer>>();
-               inputs1.add(new Tuple2<String, Integer>("a", 1));
-               inputs1.add(new Tuple2<String, Integer>("a", 2));
-               inputs1.add(new Tuple2<String, Integer>("a", 3));
-               inputs1.add(new Tuple2<String, Integer>("a", 4));
-               inputs1.add(new Tuple2<String, Integer>("a", 5));
-               inputs1.add(new Tuple2<String, Integer>("b", 6));
-               inputs1.add(new Tuple2<String, Integer>("a", 7));
-               inputs1.add(new Tuple2<String, Integer>("b", 8));
-               inputs1.add(new Tuple2<String, Integer>("b", 9));
-               inputs1.add(new Tuple2<String, Integer>("b", 10));
-
-               List<Long> timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 3L, 
4L, 4L, 5L);
-               List<Tuple2<String, String>> inputs2 = new 
ArrayList<Tuple2<String, String>>();
-               inputs2.add(new Tuple2<String, String>("1", "a"));
-               inputs2.add(new Tuple2<String, String>("2", "b"));
-               inputs2.add(new Tuple2<String, String>("1", "c"));
-               inputs2.add(new Tuple2<String, String>("2", "d"));
-               inputs2.add(new Tuple2<String, String>("1", "e"));
-               inputs2.add(new Tuple2<String, String>("2", "f"));
-               inputs2.add(new Tuple2<String, String>("1", "g"));
-               inputs2.add(new Tuple2<String, String>("2", "h"));
-               inputs2.add(new Tuple2<String, String>("1", "i"));
-
-               List<String> expected = new ArrayList<String>();
-               expected.add("15");
-               expected.add("6");
-               expected.add("16");
-               expected.add("23");
-               expected.add("7");
-               expected.add("27");
-               expected.add("ace");
-               expected.add("bdf");
-               expected.add("egi");
-               expected.add("fh");
-
-               CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, 
Tuple2<String, String>, String> invokable = new 
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, 
String>(
-                               new MyCoReduceFunction(), 4L, 3L, 2L, 2L, 
keySelector0, keySelector1,
-                               new MyTimeStamp<Tuple2<String, 
Integer>>(timestamps1),
-                               new MyTimeStamp<Tuple2<String, 
String>>(timestamps2));
-
-               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs1, inputs2);
-
-               Collections.sort(result);
-               Collections.sort(expected);
-               assertEquals(expected, result);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
deleted file mode 100644
index 035a021..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoWindowReduceTest {
-
-       private static class MyCoReduceFunction implements 
CoReduceFunction<Integer, String, String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Integer reduce1(Integer value1, Integer value2) {
-                       return value1 + value2;
-               }
-
-               @Override
-               public String reduce2(String value1, String value2) {
-                       return value1 + value2;
-               }
-
-               @Override
-               public String map1(Integer value) {
-                       return value.toString();
-               }
-
-               @Override
-               public String map2(String value) {
-                       return value;
-               }
-       }
-
-       public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
-               private static final long serialVersionUID = 1L;
-
-               private Iterator<Long> timestamps;
-               private long start;
-
-               public MyTimeStamp(List<Long> timestamps) {
-                       super(null, 0);
-                       this.timestamps = timestamps.iterator();
-                       this.start = timestamps.get(0);
-               }
-
-               @Override
-               public long getTimestamp(T value) {
-                       long ts = timestamps.next();
-                       return ts;
-               }
-
-               @Override
-               public long getStartTime() {
-                       return start;
-               }
-       }
-
-       @Test
-       public void coWindowReduceTest1() {
-
-               List<Integer> inputs = new ArrayList<Integer>();
-               for (Integer i = 1; i <= 10; i++) {
-                       inputs.add(i);
-               }
-
-               List<String> inputs2 = new ArrayList<String>();
-               inputs2.add("a");
-               inputs2.add("b");
-               inputs2.add("c");
-               inputs2.add("d");
-               inputs2.add("e");
-               inputs2.add("f");
-               inputs2.add("g");
-               inputs2.add("h");
-               inputs2.add("i");
-
-               List<Long> timestamps1 = Arrays.asList(0L, 2L, 3L, 5L, 7L, 9L, 
10L, 11L, 11L, 13L);
-               List<Long> timestamps2 = Arrays.asList(0L, 1L, 1L, 2L, 2L, 3L, 
3L, 4L, 4L);
-
-               CoWindowReduceInvokable<Integer, String, String> invokable = 
new CoWindowReduceInvokable<Integer, String, String>(
-                               new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new 
MyTimeStamp<Integer>(timestamps1),
-                               new MyTimeStamp<String>(timestamps2));
-
-               List<String> expected = new ArrayList<String>();
-               expected.add("6");
-               expected.add("9");
-               expected.add("30");
-               expected.add("10");
-               expected.add("abcde");
-               expected.add("fghi");
-
-               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs, inputs2);
-
-               Collections.sort(result);
-               Collections.sort(expected);
-               assertEquals(expected, result);
-
-       }
-
-       @Test
-       public void coWindowReduceTest2() {
-
-               List<Integer> inputs = new ArrayList<Integer>();
-               for (Integer i = 1; i <= 10; i++) {
-                       inputs.add(i);
-               }
-
-               List<String> inputs2 = new ArrayList<String>();
-               inputs2.add("a");
-               inputs2.add("b");
-               inputs2.add("c");
-               inputs2.add("d");
-               inputs2.add("e");
-               inputs2.add("f");
-               inputs2.add("g");
-               inputs2.add("h");
-               inputs2.add("i");
-
-               List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 
3L, 8L, 10L, 11L);
-               List<Long> timestamps2 = Arrays.asList(1L, 2L, 4L, 5L, 6L, 9L, 
10L, 11L, 13L);
-
-               CoWindowReduceInvokable<Integer, String, String> invokable = 
new CoWindowReduceInvokable<Integer, String, String>(
-                               new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new 
MyTimeStamp<Integer>(timestamps1),
-                               new MyTimeStamp<String>(timestamps2));
-
-               List<String> expected = new ArrayList<String>();
-               expected.add("28");
-               expected.add("18");
-               expected.add("8");
-               expected.add("27");
-               expected.add("ab");
-               expected.add("cd");
-               expected.add("de");
-               expected.add("f");
-               expected.add("fgh");
-               expected.add("hi");
-
-               List<String> result = MockCoContext.createAndExecute(invokable, 
inputs, inputs2);
-
-               Collections.sort(result);
-               Collections.sort(expected);
-               assertEquals(expected, result);
-
-       }
-
-}

Reply via email to