[streaming] Deleted obsolete parts of the connected stream api
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcd28fcc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcd28fcc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcd28fcc Branch: refs/heads/release-0.8 Commit: fcd28fccd38a070f69676f48b2038af9c6e5eb4e Parents: fb86fde Author: Gyula Fora <[email protected]> Authored: Mon Jan 5 15:59:50 2015 +0100 Committer: mbalassi <[email protected]> Committed: Tue Jan 6 00:22:58 2015 +0100 ---------------------------------------------------------------------- .../api/datastream/CoBatchedDataStream.java | 126 -------- .../api/datastream/CoWindowDataStream.java | 112 ------- .../api/datastream/ConnectedDataStream.java | 167 ---------- .../operator/co/CoBatchReduceInvokable.java | 316 ------------------- .../co/CoGroupedBatchReduceInvokable.java | 79 ----- .../co/CoGroupedWindowReduceInvokable.java | 158 ---------- .../operator/co/CoWindowReduceInvokable.java | 189 ----------- .../invokable/operator/CoBatchReduceTest.java | 137 -------- .../operator/CoGroupedBatchReduceTest.java | 172 ---------- .../operator/CoGroupedWindowReduceTest.java | 211 ------------- .../invokable/operator/CoWindowReduceTest.java | 172 ---------- 11 files changed, 1839 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java deleted file mode 100644 index 3b58188..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoBatchedDataStream.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; - -/** - * A {@link CoBatchedDataStream} represents a two data stream whose elements are - * batched together in sliding batches. Operation - * {@link #reduce(ReduceFunction)} can be applied for each batch and the batch - * is slid afterwards. - * - * @param <IN1> - * The type of the first input data stream - * @param <IN2> - * The type of the second input data stream - */ -public class CoBatchedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> { - - protected long batchSize1; - protected long batchSize2; - protected long slideSize1; - protected long slideSize2; - - protected CoBatchedDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2, - long batchSize1, long batchSize2, long slideSize1, long slideSize2) { - super(dataStream1, dataStream2); - this.batchSize1 = batchSize1; - this.batchSize2 = batchSize2; - this.slideSize1 = slideSize1; - this.slideSize2 = slideSize2; - } - - protected CoBatchedDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long batchSize1, - long batchSize2, long slideSize1, long slideSize2) { - super(coDataStream); - this.batchSize1 = batchSize1; - this.batchSize2 = batchSize2; - this.slideSize1 = slideSize1; - this.slideSize2 = slideSize2; - } - - protected CoBatchedDataStream(CoBatchedDataStream<IN1, IN2> coBatchedDataStream) { - super(coBatchedDataStream); - this.batchSize1 = coBatchedDataStream.batchSize1; - this.batchSize2 = coBatchedDataStream.batchSize2; - this.slideSize1 = coBatchedDataStream.slideSize1; - this.slideSize2 = coBatchedDataStream.slideSize2; - } - - /** - * Groups the elements of the {@link CoBatchedDataStream} by the given key - * positions to be used with grouped operators. - * - * @param keyPosition1 - * The position of the field on which the first input data stream - * will be grouped. - * @param keyPosition2 - * The position of the field on which the second input data - * stream will be grouped. - * @return The transformed {@link CoBatchedDataStream} - */ - public CoBatchedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) { - return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1), - dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2); - } - - public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) { - return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1), - dataStream2.groupBy(keyPositions2), batchSize1, batchSize2, slideSize1, slideSize2); - } - - public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) { - return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(field1), - dataStream2.groupBy(field2), batchSize1, batchSize2, slideSize1, slideSize2); - } - - public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) { - return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(fields1), - dataStream2.groupBy(fields2), batchSize1, batchSize2, slideSize1, slideSize2); - } - - public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1, - KeySelector<IN2, ?> keySelector2) { - return new CoBatchedDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1), - dataStream2.groupBy(keySelector2), batchSize1, batchSize2, slideSize1, slideSize2); - } - - @Override - protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable( - CoReduceFunction<IN1, IN2, OUT> coReducer) { - CoBatchReduceInvokable<IN1, IN2, OUT> invokable; - if (isGrouped) { - invokable = new CoGroupedBatchReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1, - batchSize2, slideSize1, slideSize2, keySelector1, keySelector2); - } else { - invokable = new CoBatchReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1, - batchSize2, slideSize1, slideSize2); - } - return invokable; - } - - protected CoBatchedDataStream<IN1, IN2> copy() { - return new CoBatchedDataStream<IN1, IN2>(this); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java deleted file mode 100644 index 9129f9e..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; - -/** - * A {@link CoWindowDataStream} represents two data streams whose elements are - * batched together into sliding windows. Operation - * {@link #reduce(CoReduceFunction)} can be applied for each window. - * - * @param <IN1> - * The type of the first input data stream - * @param <IN2> - * The type of the second input data stream - */ -public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2> { - TimestampWrapper<IN1> timeStamp1; - TimestampWrapper<IN2> timeStamp2; - - protected CoWindowDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2, - long windowSize1, long windowSize2, long slideInterval1, long slideInterval2, - TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) { - super(dataStream1, dataStream2, windowSize1, windowSize2, slideInterval1, slideInterval2); - this.timeStamp1 = timeStamp1; - this.timeStamp2 = timeStamp2; - } - - protected CoWindowDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long windowSize1, - long windowSize2, long slideInterval1, long slideInterval2, - TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) { - super(coDataStream, windowSize1, windowSize2, slideInterval1, slideInterval2); - this.timeStamp1 = timeStamp1; - this.timeStamp2 = timeStamp2; - } - - protected CoWindowDataStream(CoWindowDataStream<IN1, IN2> coWindowDataStream) { - super(coWindowDataStream); - this.timeStamp1 = coWindowDataStream.timeStamp1; - this.timeStamp2 = coWindowDataStream.timeStamp2; - } - - public CoWindowDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) { - return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1), - dataStream2.groupBy(keyPosition2), batchSize1, batchSize2, slideSize1, slideSize2, - timeStamp1, timeStamp2); - } - - public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) { - return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1), - dataStream2.groupBy(keyPositions2), batchSize1, batchSize2, slideSize1, slideSize2, - timeStamp1, timeStamp2); - } - - public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) { - return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(field1), - dataStream2.groupBy(field2), batchSize1, batchSize2, slideSize1, slideSize2, - timeStamp1, timeStamp2); - } - - public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) { - return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(fields1), - dataStream2.groupBy(fields2), batchSize1, batchSize2, slideSize1, slideSize2, - timeStamp1, timeStamp2); - } - - public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1, - KeySelector<IN2, ?> keySelector2) { - return new CoWindowDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1), - dataStream2.groupBy(keySelector2), batchSize1, batchSize2, slideSize1, slideSize2, - timeStamp1, timeStamp2); - } - - @Override - protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable( - CoReduceFunction<IN1, IN2, OUT> coReducer) { - CoWindowReduceInvokable<IN1, IN2, OUT> invokable; - if (isGrouped) { - invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), - batchSize1, batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, - timeStamp1, timeStamp2); - } else { - invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1, - batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2); - } - return invokable; - } - - protected CoWindowDataStream<IN1, IN2> copy() { - return new CoWindowDataStream<IN1, IN2>(this); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index efd9531..a0c8ff8 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -227,173 +227,6 @@ public class ConnectedDataStream<IN1, IN2> { } /** - * Batch operation for connected data stream. Collects each input data - * stream's elements simultaneously into sliding batches creating a new - * {@link CoBatchedDataStream}. Then the user can apply - * {@link CoBatchedDataStream#reduce} transformation on the - * {@link CoBatchedDataStream}. - * - * @param batchSize1 - * The number of elements in each batch of the first input data - * stream - * @param batchSize2 - * The number of elements in each batch of the second input data - * stream - * @param slideSize1 - * The number of elements with which the batches of the first - * input data stream are slid by after each transformation. - * @param slideSize2 - * The number of elements with which the batches of the second - * input data stream are slid by after each transformation. - * @return The transformed {@link ConnectedDataStream} - */ - public CoBatchedDataStream<IN1, IN2> batch(long batchSize1, long batchSize2, long slideSize1, - long slideSize2) { - if (batchSize1 < 1 || batchSize2 < 1) { - throw new IllegalArgumentException("Batch size must be positive"); - } - if (slideSize1 < 1 || slideSize2 < 1) { - throw new IllegalArgumentException("Slide size must be positive"); - } - return new CoBatchedDataStream<IN1, IN2>(this, batchSize1, batchSize2, slideSize1, - slideSize2); - } - - /** - * Batch operation for connected data stream. Collects each input data - * stream's elements simultaneously into batches creating a new - * {@link CoBatchedDataStream}. Then the user can apply - * {@link CoBatchedDataStream#reduce} transformation on the - * {@link CoBatchedDataStream}. - * - * @param batchSize1 - * The number of elements in each batch of the first input data - * stream - * @param batchSize2 - * The number of elements in each batch of the second input data - * stream - * @return The transformed {@link ConnectedDataStream} - */ - public CoBatchedDataStream<IN1, IN2> batch(long batchSize1, long batchSize2) { - return batch(batchSize1, batchSize2, batchSize1, batchSize2); - } - - /** - * Window operation for connected data stream. Collects each input data - * stream's elements simultaneously into sliding windows creating a new - * {@link CoWindowDataStream}. Then the user can apply - * {@link WindowDataStream#reduce} transformation on the - * {@link CoWindowDataStream}. The user can implement their own time stamps - * or use the system time by default. - * - * @param windowSize1 - * The length of the window of the first input data stream - * @param windowSize2 - * The length of the window of the second input data stream - * @param slideInterval1 - * The number of milliseconds with which the windows of the first - * input data stream are slid by after each transformation - * @param slideInterval2 - * The number of milliseconds with which the windows of the - * second input data stream are slid by after each transformation - * @param timeStamp1 - * User defined function for extracting time-stamps from each - * element of the first input data stream - * @param timeStamp2 - * User defined function for extracting time-stamps from each - * element of the second input data stream - * @return The transformed {@link ConnectedDataStream} - */ - public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2, - long slideInterval1, long slideInterval2, TimestampWrapper<IN1> timeStamp1, - TimestampWrapper<IN2> timeStamp2) { - if (windowSize1 < 1 || windowSize2 < 1) { - throw new IllegalArgumentException("Window size must be positive"); - } - if (slideInterval1 < 1 || slideInterval2 < 1) { - throw new IllegalArgumentException("Slide interval must be positive"); - } - return new CoWindowDataStream<IN1, IN2>(this, windowSize1, windowSize2, slideInterval1, - slideInterval2, timeStamp1, timeStamp2); - } - - /** - * Window operation for connected data stream. Collects each input data - * stream's elements simultaneously into sliding windows creating a new - * {@link CoWindowDataStream}. Then the user can apply - * {@link WindowDataStream#reduce} transformation on the - * {@link CoWindowDataStream}. - * - * @param windowSize1 - * The length of the window of the first input data stream in - * milliseconds. - * @param windowSize2 - * The length of the window of the second input data stream in - * milliseconds. - * @param slideInterval1 - * The number of milliseconds with which the windows of the first - * input data stream are slid by after each transformation - * @param slideInterval2 - * The number of milliseconds with which the windows of the - * second input data stream are slid by after each transformation - * @return The transformed {@link ConnectedDataStream} - */ - @SuppressWarnings("unchecked") - public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2, - long slideInterval1, long slideInterval2) { - return window(windowSize1, windowSize2, slideInterval1, slideInterval2, - (TimestampWrapper<IN1>) SystemTimestamp.getWrapper(), - (TimestampWrapper<IN2>) SystemTimestamp.getWrapper()); - } - - /** - * Window operation for connected data stream. Collects each input data - * stream's elements simultaneously into windows creating a new - * {@link CoWindowDataStream}. Then the user can apply - * {@link WindowDataStream#reduce} transformation on the - * {@link CoWindowDataStream}. The user can implement their own time stamps - * or use the system time by default. - * - * @param windowSize1 - * The length of the window of the first input data stream - * @param windowSize2 - * The length of the window of the second input data stream - * @param timeStamp1 - * User defined function for extracting time-stamps from each - * element of the first input data stream - * @param timeStamp2 - * User defined function for extracting time-stamps from each - * element of the second input data stream - * @return The transformed {@link ConnectedDataStream} - */ - public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2, - TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) { - return window(windowSize1, windowSize2, windowSize1, windowSize2, timeStamp1, timeStamp2); - } - - /** - * Window operation for connected data stream. Collects each input data - * stream's elements simultaneously into windows creating a new - * {@link CoWindowDataStream}. Then the user can apply - * {@link WindowDataStream#reduce} transformation on the - * {@link CoWindowDataStream}. - * - * @param windowSize1 - * The length of the window of the first input data stream in - * milliseconds - * @param windowSize2 - * The length of the window of the second input data stream in - * milliseconds - * @return The transformed {@link ConnectedDataStream} - */ - @SuppressWarnings("unchecked") - public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) { - return window(windowSize1, windowSize2, windowSize1, windowSize2, - (TimestampWrapper<IN1>) SystemTimestamp.getWrapper(), - (TimestampWrapper<IN2>) SystemTimestamp.getWrapper()); - } - - /** * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps * the output to a common type. The transformation calls a * {@link CoMapFunction#map1} for each element of the first input and http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java deleted file mode 100644 index 4ed49fd..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.commons.math.util.MathUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.state.NullableCircularBuffer; - -public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> { - - private static final long serialVersionUID = 1L; - protected CoReduceFunction<IN1, IN2, OUT> coReducer; - - protected long slideSize1; - protected long slideSize2; - protected long batchSize1; - protected long batchSize2; - protected int granularity1; - protected int granularity2; - protected long batchPerSlide1; - protected long batchPerSlide2; - protected long numberOfBatches1; - protected long numberOfBatches2; - protected StreamBatch<IN1> batch1; - protected StreamBatch<IN2> batch2; - protected StreamBatch<IN1> currentBatch1; - protected StreamBatch<IN2> currentBatch2; - - public CoBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, - long batchSize2, long slideSize1, long slideSize2) { - super(coReducer); - this.coReducer = coReducer; - this.batchSize1 = batchSize1; - this.batchSize2 = batchSize2; - this.slideSize1 = slideSize1; - this.slideSize2 = slideSize2; - this.granularity1 = (int) MathUtils.gcd(batchSize1, slideSize1); - this.granularity2 = (int) MathUtils.gcd(batchSize2, slideSize2); - this.batchPerSlide1 = slideSize1 / granularity1; - this.batchPerSlide2 = slideSize2 / granularity2; - this.numberOfBatches1 = batchSize1 / granularity1; - this.numberOfBatches2 = batchSize2 / granularity2; - } - - @Override - public void invoke() throws Exception { - while (true) { - int next = recordIterator.next(reuse1, reuse2); - if (next == 0) { - reduceLastBatch1(); - reduceLastBatch2(); - break; - } else if (next == 1) { - handleStream1(); - resetReuse1(); - } else { - handleStream2(); - resetReuse2(); - } - } - } - - @Override - protected void handleStream1() throws Exception { - StreamBatch<IN1> batch1 = getBatch1(reuse1); - reduceToBuffer1(reuse1.getObject(), batch1); - } - - @Override - protected void handleStream2() throws Exception { - StreamBatch<IN2> batch2 = getBatch2(reuse2); - reduceToBuffer2(reuse2.getObject(), batch2); - } - - protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) { - return batch1; - } - - protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) { - return batch2; - } - - protected void reduce1(StreamBatch<IN1> batch) { - this.currentBatch1 = batch; - callUserFunctionAndLogException1(); - } - - protected void reduce2(StreamBatch<IN2> batch) { - this.currentBatch2 = batch; - callUserFunctionAndLogException2(); - } - - protected void reduceLastBatch1() throws Exception { - reduceLastBatch1(batch1); - } - - protected void reduceLastBatch2() throws Exception { - reduceLastBatch2(batch2); - } - - @Override - protected void callUserFunction1() throws Exception { - Iterator<IN1> reducedIterator = currentBatch1.getIterator(); - IN1 reduced = null; - - while (reducedIterator.hasNext() && reduced == null) { - reduced = reducedIterator.next(); - } - - while (reducedIterator.hasNext()) { - IN1 next = reducedIterator.next(); - if (next != null) { - reduced = coReducer.reduce1(serializer1.copy(reduced), serializer1.copy(next)); - } - } - if (reduced != null) { - collector.collect(coReducer.map1(serializer1.copy(reduced))); - } - } - - @Override - protected void callUserFunction2() throws Exception { - Iterator<IN2> reducedIterator = currentBatch2.getIterator(); - IN2 reduced = null; - - while (reducedIterator.hasNext() && reduced == null) { - reduced = reducedIterator.next(); - } - - while (reducedIterator.hasNext()) { - IN2 next = reducedIterator.next(); - if (next != null) { - reduced = coReducer.reduce2(serializer2.copy(reduced), serializer2.copy(next)); - } - } - if (reduced != null) { - collector.collect(coReducer.map2(serializer2.copy(reduced))); - } - } - - @Override - public void open(Configuration config) throws Exception { - super.open(config); - this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1); - this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2); - } - - public void reduceToBuffer1(IN1 nextValue, StreamBatch<IN1> streamBatch) throws Exception { - - if (streamBatch.currentValue != null) { - streamBatch.currentValue = coReducer.reduce1( - serializer1.copy(streamBatch.currentValue), serializer1.copy(nextValue)); - } else { - streamBatch.currentValue = nextValue; - } - - streamBatch.counter++; - - if (streamBatch.miniBatchEnd()) { - streamBatch.addToBuffer(); - if (streamBatch.batchEnd()) { - reduceBatch1(streamBatch); - } - } - } - - public void reduceToBuffer2(IN2 nextValue, StreamBatch<IN2> streamBatch) throws Exception { - - if (streamBatch.currentValue != null) { - streamBatch.currentValue = coReducer.reduce2( - serializer2.copy(streamBatch.currentValue), serializer2.copy(nextValue)); - } else { - streamBatch.currentValue = nextValue; - } - - streamBatch.counter++; - - if (streamBatch.miniBatchEnd()) { - streamBatch.addToBuffer(); - if (streamBatch.batchEnd()) { - reduceBatch2(streamBatch); - } - } - } - - public void reduceLastBatch1(StreamBatch<IN1> streamBatch) throws Exception { - if (streamBatch.miniBatchInProgress()) { - streamBatch.addToBuffer(); - } - - if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) { - if (streamBatch.circularBuffer.isFull()) { - for (long i = 0; i < (numberOfBatches1 - streamBatch.minibatchCounter); i++) { - if (!streamBatch.circularBuffer.isEmpty()) { - streamBatch.circularBuffer.remove(); - } - } - } - if (!streamBatch.circularBuffer.isEmpty()) { - reduce1(streamBatch); - } - } - - } - - public void reduceLastBatch2(StreamBatch<IN2> streamBatch) throws Exception { - if (streamBatch.miniBatchInProgress()) { - streamBatch.addToBuffer(); - } - - if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) { - if (streamBatch.circularBuffer.isFull()) { - for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) { - if (!streamBatch.circularBuffer.isEmpty()) { - streamBatch.circularBuffer.remove(); - } - } - } - if (!streamBatch.circularBuffer.isEmpty()) { - reduce2(streamBatch); - } - } - - } - - public void reduceBatch1(StreamBatch<IN1> streamBatch) { - reduce1(streamBatch); - streamBatch.changed = false; - } - - public void reduceBatch2(StreamBatch<IN2> streamBatch) { - reduce2(streamBatch); - streamBatch.changed = false; - } - - protected class StreamBatch<IN> implements Serializable { - private static final long serialVersionUID = 1L; - - protected long counter; - protected long minibatchCounter; - protected IN currentValue; - protected long batchSize; - protected long slideSize; - protected long granularity; - protected long batchPerSlide; - protected long numberOfBatches; - boolean changed; - - protected NullableCircularBuffer circularBuffer; - - public StreamBatch(long batchSize, long slideSize) { - this.batchSize = batchSize; - this.slideSize = slideSize; - this.granularity = (int) MathUtils.gcd(batchSize, slideSize); - this.batchPerSlide = slideSize / granularity; - this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity)); - this.counter = 0; - this.minibatchCounter = 0; - this.currentValue = null; - this.numberOfBatches = batchSize / granularity; - this.changed = false; - - } - - protected void addToBuffer() { - circularBuffer.add(currentValue); - changed = true; - minibatchCounter++; - currentValue = null; - } - - protected boolean miniBatchEnd() { - return (counter % granularity) == 0; - } - - public boolean batchEnd() { - if (counter == batchSize) { - counter -= slideSize; - minibatchCounter -= batchPerSlide; - return true; - } - return false; - } - - public boolean miniBatchInProgress() { - return currentValue != null; - } - - @SuppressWarnings("unchecked") - public Iterator<IN> getIterator() { - return circularBuffer.iterator(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java deleted file mode 100644 index 745f507..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchReduceInvokable.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; - -public class CoGroupedBatchReduceInvokable<IN1, IN2, OUT> extends - CoBatchReduceInvokable<IN1, IN2, OUT> { - private static final long serialVersionUID = 1L; - protected KeySelector<IN1, ?> keySelector1; - protected KeySelector<IN2, ?> keySelector2; - Map<Object, StreamBatch<IN1>> streamBatches1; - Map<Object, StreamBatch<IN2>> streamBatches2; - - public CoGroupedBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, - long batchSize1, long batchSize2, long slideSize1, long slideSize2, - KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) { - super(coReducer, batchSize1, batchSize2, slideSize1, slideSize2); - this.keySelector1 = keySelector1; - this.keySelector2 = keySelector2; - this.streamBatches1 = new HashMap<Object, StreamBatch<IN1>>(); - this.streamBatches2 = new HashMap<Object, StreamBatch<IN2>>(); - } - - protected void reduceLastBatch1() throws Exception { - for (StreamBatch<IN1> batch : streamBatches1.values()) { - reduceLastBatch1(batch); - } - } - - protected void reduceLastBatch2() throws Exception { - for (StreamBatch<IN2> batch : streamBatches2.values()) { - reduceLastBatch2(batch); - } - } - - @Override - protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) { - Object key = next.getKey(keySelector1); - StreamBatch<IN1> batch = streamBatches1.get(key); - if (batch == null) { - batch = new StreamBatch<IN1>(batchSize1, slideSize1); - streamBatches1.put(key, batch); - } - return batch; - } - - @Override - protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) { - Object key = next.getKey(keySelector2); - StreamBatch<IN2> batch = streamBatches2.get(key); - if (batch == null) { - batch = new StreamBatch<IN2>(batchSize2, slideSize2); - streamBatches2.put(key, batch); - } - return batch; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java deleted file mode 100644 index 736239f..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; - -public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends - CoWindowReduceInvokable<IN1, IN2, OUT> { - private static final long serialVersionUID = 1L; - protected KeySelector<IN1, ?> keySelector1; - protected KeySelector<IN2, ?> keySelector2; - private Map<Object, StreamWindow<IN1>> streamWindows1; - private Map<Object, StreamWindow<IN2>> streamWindows2; - private long currentMiniBatchCount1 = 0; - private long currentMiniBatchCount2 = 0; - - public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, - long windowSize1, long windowSize2, long slideInterval1, long slideInterval2, - KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2, - TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) { - super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1, - timestamp2); - this.keySelector1 = keySelector1; - this.keySelector2 = keySelector2; - this.streamWindows1 = new HashMap<Object, StreamWindow<IN1>>(); - this.streamWindows2 = new HashMap<Object, StreamWindow<IN2>>(); - } - - @Override - protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) { - Object key = next.getKey(keySelector1); - StreamWindow<IN1> window = streamWindows1.get(key); - if (window == null) { - window = new GroupedStreamWindow<IN1>(batchSize1, slideSize1); - window.minibatchCounter = currentMiniBatchCount1; - streamWindows1.put(key, window); - } - this.window1 = window; - return window; - } - - @Override - protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) { - Object key = next.getKey(keySelector2); - StreamWindow<IN2> window = streamWindows2.get(key); - if (window == null) { - window = new GroupedStreamWindow<IN2>(batchSize2, slideSize2); - window.minibatchCounter = currentMiniBatchCount2; - streamWindows2.put(key, window); - } - this.window2 = window; - return window; - } - - private void addToAllBuffers1() { - for (StreamBatch<IN1> window : streamWindows1.values()) { - window.addToBuffer(); - } - } - - private void addToAllBuffers2() { - for (StreamBatch<IN2> window : streamWindows2.values()) { - window.addToBuffer(); - } - } - - private void reduceAllWindows1() { - for (StreamBatch<IN1> window : streamWindows1.values()) { - window.minibatchCounter -= batchPerSlide1; - reduceBatch1((StreamBatch<IN1>) window); - } - } - - private void reduceAllWindows2() { - for (StreamBatch<IN2> window : streamWindows2.values()) { - window.minibatchCounter -= batchPerSlide2; - reduceBatch2((StreamBatch<IN2>) window); - } - } - - @Override - protected void reduceLastBatch1() throws Exception { - for (StreamBatch<IN1> window : streamWindows1.values()) { - reduceLastBatch1((StreamBatch<IN1>) window); - } - } - - @Override - protected void reduceLastBatch2() throws Exception { - for (StreamBatch<IN2> window : streamWindows2.values()) { - reduceLastBatch2((StreamBatch<IN2>) window); - } - } - - @Override - protected synchronized void checkWindowEnd1(long timeStamp, StreamWindow<IN1> streamWindow) { - nextRecordTime1 = timeStamp; - - while (miniBatchEnd1()) { - addToAllBuffers1(); - if (streamWindow.batchEnd()) { - reduceAllWindows1(); - } - } - currentMiniBatchCount1 = streamWindow.minibatchCounter; - } - - @Override - protected synchronized void checkWindowEnd2(long timeStamp, StreamWindow<IN2> streamWindow) { - nextRecordTime2 = timeStamp; - - while (miniBatchEnd2()) { - addToAllBuffers2(); - if (streamWindow.batchEnd()) { - reduceAllWindows2(); - } - } - currentMiniBatchCount2 = streamWindow.minibatchCounter; - } - - protected class GroupedStreamWindow<IN> extends StreamWindow<IN> { - private static final long serialVersionUID = 1L; - - public GroupedStreamWindow(long windowSize, long slideInterval) { - super(windowSize, slideInterval); - } - - @Override - public boolean batchEnd() { - if (minibatchCounter == numberOfBatches) { - return true; - } - return false; - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java deleted file mode 100644 index 0c8598f..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator.co; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; - -public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokable<IN1, IN2, OUT> { - private static final long serialVersionUID = 1L; - protected long startTime1; - protected long startTime2; - protected long nextRecordTime1; - protected long nextRecordTime2; - protected TimestampWrapper<IN1> timestamp1; - protected TimestampWrapper<IN2> timestamp2; - protected StreamWindow<IN1> window1; - protected StreamWindow<IN2> window2; - - public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, - long windowSize2, long slideInterval1, long slideInterval2, - TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) { - super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2); - this.timestamp1 = timestamp1; - this.timestamp2 = timestamp2; - this.startTime1 = timestamp1.getStartTime(); - this.startTime2 = timestamp2.getStartTime(); - - } - - @Override - public void open(Configuration config) throws Exception { - super.open(config); - this.window1 = new StreamWindow<IN1>(batchSize1, slideSize1); - this.window2 = new StreamWindow<IN2>(batchSize2, slideSize2); - this.batch1 = this.window1; - this.batch2 = this.window2; - if (timestamp1.isDefaultTimestamp()) { - (new TimeCheck1()).start(); - } - if (timestamp2.isDefaultTimestamp()) { - (new TimeCheck2()).start(); - } - } - - @Override - public void reduceToBuffer1(IN1 nextValue, StreamBatch<IN1> streamWindow) throws Exception { - - checkWindowEnd1(timestamp1.getTimestamp(nextValue), (StreamWindow<IN1>) streamWindow); - - if (streamWindow.currentValue != null) { - streamWindow.currentValue = coReducer.reduce1( - serializer1.copy(streamWindow.currentValue), serializer1.copy(nextValue)); - } else { - streamWindow.currentValue = nextValue; - } - } - - @Override - public void reduceToBuffer2(IN2 nextValue, StreamBatch<IN2> streamWindow) throws Exception { - - checkWindowEnd2(timestamp2.getTimestamp(nextValue), (StreamWindow<IN2>) streamWindow); - - if (streamWindow.currentValue != null) { - streamWindow.currentValue = coReducer.reduce2( - serializer2.copy(streamWindow.currentValue), serializer2.copy(nextValue)); - } else { - streamWindow.currentValue = nextValue; - } - } - - protected synchronized void checkWindowEnd1(long timeStamp, StreamWindow<IN1> streamWindow) { - nextRecordTime1 = timeStamp; - - while (miniBatchEnd1()) { - streamWindow.addToBuffer(); - if (streamWindow.batchEnd()) { - reduceBatch1(streamWindow); - } - } - } - - protected synchronized void checkWindowEnd2(long timeStamp, StreamWindow<IN2> streamWindow) { - nextRecordTime2 = timeStamp; - - while (miniBatchEnd2()) { - streamWindow.addToBuffer(); - if (streamWindow.batchEnd()) { - reduceBatch2(streamWindow); - } - } - } - - protected boolean miniBatchEnd1() { - if (nextRecordTime1 < startTime1 + granularity1) { - return false; - } else { - startTime1 += granularity1; - return true; - } - } - - protected boolean miniBatchEnd2() { - if (nextRecordTime2 < startTime2 + granularity2) { - return false; - } else { - startTime2 += granularity2; - return true; - } - } - - @Override - public void reduceBatch1(StreamBatch<IN1> streamBatch) { - reduce1(streamBatch); - } - - @Override - public void reduceBatch2(StreamBatch<IN2> streamBatch) { - reduce2(streamBatch); - } - - protected class StreamWindow<IN> extends StreamBatch<IN> { - private static final long serialVersionUID = 1L; - - public StreamWindow(long windowSize, long slideInterval) { - super(windowSize, slideInterval); - } - - @Override - public boolean batchEnd() { - if (minibatchCounter == numberOfBatches) { - minibatchCounter -= batchPerSlide; - return true; - } - return false; - } - - } - - private class TimeCheck1 extends Thread { - @Override - public void run() { - while (true) { - try { - Thread.sleep(slideSize1); - } catch (InterruptedException e) { - } - if (isRunning) { - checkWindowEnd1(System.currentTimeMillis(), window1); - } else { - break; - } - } - } - } - - private class TimeCheck2 extends Thread { - @Override - public void run() { - while (true) { - try { - Thread.sleep(slideSize2); - } catch (InterruptedException e) { - } - if (isRunning) { - checkWindowEnd2(System.currentTimeMillis(), window2); - } else { - break; - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java deleted file mode 100644 index 1db286c..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoBatchReduceTest { - - private static class MyCoReduceFunction implements CoReduceFunction<Integer, String, String> { - private static final long serialVersionUID = 1L; - - @Override - public Integer reduce1(Integer value1, Integer value2) { - return value1 + value2; - } - - @Override - public String reduce2(String value1, String value2) { - return value1 + value2; - } - - @Override - public String map1(Integer value) { - return value.toString(); - } - - @Override - public String map2(String value) { - return value; - } - } - - @Test - public void coBatchReduceTest1() { - - List<Integer> inputs = new ArrayList<Integer>(); - for (Integer i = 1; i <= 10; i++) { - inputs.add(i); - } - - List<String> inputs2 = new ArrayList<String>(); - inputs2.add("a"); - inputs2.add("b"); - inputs2.add("c"); - inputs2.add("d"); - inputs2.add("e"); - inputs2.add("f"); - inputs2.add("g"); - inputs2.add("h"); - inputs2.add("i"); - - CoBatchReduceInvokable<Integer, String, String> invokable = new CoBatchReduceInvokable<Integer, String, String>( - new MyCoReduceFunction(), 4L, 3L, 4L, 3L); - - List<String> expected = new ArrayList<String>(); - expected.add("10"); - expected.add("26"); - expected.add("19"); - expected.add("abc"); - expected.add("def"); - expected.add("ghi"); - - List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2); - - Collections.sort(result); - Collections.sort(expected); - - assertEquals(expected, result); - - } - - @Test - public void coBatchReduceTest2() { - - List<Integer> inputs = new ArrayList<Integer>(); - for (Integer i = 1; i <= 10; i++) { - inputs.add(i); - } - - List<String> inputs2 = new ArrayList<String>(); - inputs2.add("a"); - inputs2.add("b"); - inputs2.add("c"); - inputs2.add("d"); - inputs2.add("e"); - inputs2.add("f"); - inputs2.add("g"); - inputs2.add("h"); - inputs2.add("i"); - - CoBatchReduceInvokable<Integer, String, String> invokable = new CoBatchReduceInvokable<Integer, String, String>( - new MyCoReduceFunction(), 4L, 3L, 2L, 2L); - - List<String> expected = new ArrayList<String>(); - expected.add("10"); - expected.add("18"); - expected.add("26"); - expected.add("34"); - expected.add("abc"); - expected.add("cde"); - expected.add("efg"); - expected.add("ghi"); - - List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2); - - Collections.sort(result); - Collections.sort(expected); - - assertEquals(expected, result); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java deleted file mode 100644 index 1d0732c..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoGroupedBatchReduceTest { - - KeySelector<Tuple2<String, String>, ?> keySelector1 = new KeySelector<Tuple2<String, String>, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple2<String, String> value) throws Exception { - return value.f0; - } - }; - - KeySelector<Tuple2<String, Integer>, ?> keySelector2 = new KeySelector<Tuple2<String, Integer>, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple2<String, Integer> value) throws Exception { - return value.f0; - } - }; - - private static class MyCoReduceFunction implements - CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, String>, String> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> value1, - Tuple2<String, Integer> value2) { - return new Tuple2<String, Integer>("a", value1.f1 + value2.f1); - } - - @Override - public Tuple2<String, String> reduce2(Tuple2<String, String> value1, - Tuple2<String, String> value2) { - return new Tuple2<String, String>("a", value1.f1 + value2.f1); - } - - @Override - public String map1(Tuple2<String, Integer> value) { - return value.f1.toString(); - } - - @Override - public String map2(Tuple2<String, String> value) { - return value.f1; - } - } - - @Test - public void coGroupedBatchReduceTest1() { - - List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>(); - inputs1.add(new Tuple2<String, Integer>("a", 1)); - inputs1.add(new Tuple2<String, Integer>("a", 2)); - inputs1.add(new Tuple2<String, Integer>("a", 3)); - inputs1.add(new Tuple2<String, Integer>("a", 4)); - inputs1.add(new Tuple2<String, Integer>("a", 5)); - inputs1.add(new Tuple2<String, Integer>("b", 6)); - inputs1.add(new Tuple2<String, Integer>("a", 7)); - inputs1.add(new Tuple2<String, Integer>("b", 8)); - inputs1.add(new Tuple2<String, Integer>("b", 9)); - inputs1.add(new Tuple2<String, Integer>("b", 10)); - - List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>(); - inputs2.add(new Tuple2<String, String>("1", "a")); - inputs2.add(new Tuple2<String, String>("2", "b")); - inputs2.add(new Tuple2<String, String>("1", "c")); - inputs2.add(new Tuple2<String, String>("2", "d")); - inputs2.add(new Tuple2<String, String>("1", "e")); - inputs2.add(new Tuple2<String, String>("2", "f")); - inputs2.add(new Tuple2<String, String>("1", "g")); - inputs2.add(new Tuple2<String, String>("2", "h")); - inputs2.add(new Tuple2<String, String>("1", "i")); - - List<String> expected = new ArrayList<String>(); - expected.add("10"); - expected.add("12"); - expected.add("33"); - expected.add("ace"); - expected.add("gi"); - expected.add("bdf"); - expected.add("h"); - - CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>( - new MyCoReduceFunction(), 4L, 3L, 4L, 3L, keySelector2, keySelector1); - - List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2); - - Collections.sort(result); - Collections.sort(expected); - assertEquals(expected, result); - } - - @Test - public void coGroupedBatchReduceTest2() { - - List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>(); - inputs1.add(new Tuple2<String, Integer>("a", 1)); - inputs1.add(new Tuple2<String, Integer>("a", 2)); - inputs1.add(new Tuple2<String, Integer>("a", 3)); - inputs1.add(new Tuple2<String, Integer>("a", 4)); - inputs1.add(new Tuple2<String, Integer>("a", 5)); - inputs1.add(new Tuple2<String, Integer>("b", 6)); - inputs1.add(new Tuple2<String, Integer>("a", 7)); - inputs1.add(new Tuple2<String, Integer>("b", 8)); - inputs1.add(new Tuple2<String, Integer>("b", 9)); - inputs1.add(new Tuple2<String, Integer>("b", 10)); - - List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>(); - inputs2.add(new Tuple2<String, String>("1", "a")); - inputs2.add(new Tuple2<String, String>("2", "b")); - inputs2.add(new Tuple2<String, String>("1", "c")); - inputs2.add(new Tuple2<String, String>("2", "d")); - inputs2.add(new Tuple2<String, String>("1", "e")); - inputs2.add(new Tuple2<String, String>("2", "f")); - inputs2.add(new Tuple2<String, String>("1", "g")); - inputs2.add(new Tuple2<String, String>("2", "h")); - inputs2.add(new Tuple2<String, String>("1", "i")); - - List<String> expected = new ArrayList<String>(); - expected.add("10"); - expected.add("19"); - expected.add("33"); - expected.add("ace"); - expected.add("egi"); - expected.add("bdf"); - expected.add("fh"); - - CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>( - new MyCoReduceFunction(), 4L, 3L, 2L, 2L, keySelector2, keySelector1); - - List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2); - - Collections.sort(result); - Collections.sort(expected); - assertEquals(expected, result); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java deleted file mode 100644 index 508366c..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoGroupedWindowReduceTest { - - KeySelector<Tuple2<String, Integer>, ?> keySelector0 = new KeySelector<Tuple2<String, Integer>, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple2<String, Integer> value) throws Exception { - return value.f0; - } - }; - - KeySelector<Tuple2<String, String>, ?> keySelector1 = new KeySelector<Tuple2<String, String>, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple2<String, String> value) throws Exception { - return value.f0; - } - }; - - private static class MyCoReduceFunction implements - CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, String>, String> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> value1, - Tuple2<String, Integer> value2) { - return new Tuple2<String, Integer>("a", value1.f1 + value2.f1); - } - - @Override - public Tuple2<String, String> reduce2(Tuple2<String, String> value1, - Tuple2<String, String> value2) { - return new Tuple2<String, String>("a", value1.f1 + value2.f1); - } - - @Override - public String map1(Tuple2<String, Integer> value) { - return value.f1.toString(); - } - - @Override - public String map2(Tuple2<String, String> value) { - return value.f1; - } - } - - public static final class MyTimeStamp<T> extends TimestampWrapper<T> { - private static final long serialVersionUID = 1L; - - private Iterator<Long> timestamps; - private long start; - - public MyTimeStamp(List<Long> timestamps) { - super(null, 0); - this.timestamps = timestamps.iterator(); - this.start = timestamps.get(0); - } - - @Override - public long getTimestamp(T value) { - long ts = timestamps.next(); - return ts; - } - - @Override - public long getStartTime() { - return start; - } - } - - @Test - public void coGroupedWindowReduceTest1() { - - List<Long> timestamps1 = Arrays.asList(0L, 0L, 1L, 1L, 1L, 1L, 2L, 4L, 5L, 6L); - List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>(); - inputs1.add(new Tuple2<String, Integer>("a", 1)); - inputs1.add(new Tuple2<String, Integer>("a", 2)); - inputs1.add(new Tuple2<String, Integer>("a", 3)); - inputs1.add(new Tuple2<String, Integer>("a", 4)); - inputs1.add(new Tuple2<String, Integer>("a", 5)); - inputs1.add(new Tuple2<String, Integer>("b", 6)); - inputs1.add(new Tuple2<String, Integer>("a", 7)); - inputs1.add(new Tuple2<String, Integer>("b", 8)); - inputs1.add(new Tuple2<String, Integer>("b", 9)); - inputs1.add(new Tuple2<String, Integer>("b", 10)); - - List<Long> timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 5L, 5L, 6L, 7L); - List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>(); - inputs2.add(new Tuple2<String, String>("1", "a")); - inputs2.add(new Tuple2<String, String>("2", "b")); - inputs2.add(new Tuple2<String, String>("1", "c")); - inputs2.add(new Tuple2<String, String>("2", "d")); - inputs2.add(new Tuple2<String, String>("1", "e")); - inputs2.add(new Tuple2<String, String>("2", "f")); - inputs2.add(new Tuple2<String, String>("1", "g")); - inputs2.add(new Tuple2<String, String>("2", "h")); - inputs2.add(new Tuple2<String, String>("1", "i")); - - List<String> expected = new ArrayList<String>(); - expected.add("6"); - expected.add("22"); - expected.add("27"); - expected.add("ace"); - expected.add("bd"); - expected.add("g"); - expected.add("fh"); - expected.add("i"); - - CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>( - new MyCoReduceFunction(), 4L, 3L, 4L, 3L, keySelector0, keySelector1, - new MyTimeStamp<Tuple2<String, Integer>>(timestamps1), - new MyTimeStamp<Tuple2<String, String>>(timestamps2)); - - List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2); - - Collections.sort(result); - Collections.sort(expected); - assertEquals(expected, result); - } - - @Test - public void coGroupedWindowReduceTest2() { - - List<Long> timestamps1 = Arrays.asList(0L, 0L, 1L, 2L, 2L, 3L, 4L, 4L, 5L, 6L); - List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>(); - inputs1.add(new Tuple2<String, Integer>("a", 1)); - inputs1.add(new Tuple2<String, Integer>("a", 2)); - inputs1.add(new Tuple2<String, Integer>("a", 3)); - inputs1.add(new Tuple2<String, Integer>("a", 4)); - inputs1.add(new Tuple2<String, Integer>("a", 5)); - inputs1.add(new Tuple2<String, Integer>("b", 6)); - inputs1.add(new Tuple2<String, Integer>("a", 7)); - inputs1.add(new Tuple2<String, Integer>("b", 8)); - inputs1.add(new Tuple2<String, Integer>("b", 9)); - inputs1.add(new Tuple2<String, Integer>("b", 10)); - - List<Long> timestamps2 = Arrays.asList(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L, 5L); - List<Tuple2<String, String>> inputs2 = new ArrayList<Tuple2<String, String>>(); - inputs2.add(new Tuple2<String, String>("1", "a")); - inputs2.add(new Tuple2<String, String>("2", "b")); - inputs2.add(new Tuple2<String, String>("1", "c")); - inputs2.add(new Tuple2<String, String>("2", "d")); - inputs2.add(new Tuple2<String, String>("1", "e")); - inputs2.add(new Tuple2<String, String>("2", "f")); - inputs2.add(new Tuple2<String, String>("1", "g")); - inputs2.add(new Tuple2<String, String>("2", "h")); - inputs2.add(new Tuple2<String, String>("1", "i")); - - List<String> expected = new ArrayList<String>(); - expected.add("15"); - expected.add("6"); - expected.add("16"); - expected.add("23"); - expected.add("7"); - expected.add("27"); - expected.add("ace"); - expected.add("bdf"); - expected.add("egi"); - expected.add("fh"); - - CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, String>, String>( - new MyCoReduceFunction(), 4L, 3L, 2L, 2L, keySelector0, keySelector1, - new MyTimeStamp<Tuple2<String, Integer>>(timestamps1), - new MyTimeStamp<Tuple2<String, String>>(timestamps2)); - - List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2); - - Collections.sort(result); - Collections.sort(expected); - assertEquals(expected, result); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fcd28fcc/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java deleted file mode 100644 index 035a021..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.operator; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; -import org.apache.flink.streaming.util.MockCoContext; -import org.junit.Test; - -public class CoWindowReduceTest { - - private static class MyCoReduceFunction implements CoReduceFunction<Integer, String, String> { - private static final long serialVersionUID = 1L; - - @Override - public Integer reduce1(Integer value1, Integer value2) { - return value1 + value2; - } - - @Override - public String reduce2(String value1, String value2) { - return value1 + value2; - } - - @Override - public String map1(Integer value) { - return value.toString(); - } - - @Override - public String map2(String value) { - return value; - } - } - - public static final class MyTimeStamp<T> extends TimestampWrapper<T> { - private static final long serialVersionUID = 1L; - - private Iterator<Long> timestamps; - private long start; - - public MyTimeStamp(List<Long> timestamps) { - super(null, 0); - this.timestamps = timestamps.iterator(); - this.start = timestamps.get(0); - } - - @Override - public long getTimestamp(T value) { - long ts = timestamps.next(); - return ts; - } - - @Override - public long getStartTime() { - return start; - } - } - - @Test - public void coWindowReduceTest1() { - - List<Integer> inputs = new ArrayList<Integer>(); - for (Integer i = 1; i <= 10; i++) { - inputs.add(i); - } - - List<String> inputs2 = new ArrayList<String>(); - inputs2.add("a"); - inputs2.add("b"); - inputs2.add("c"); - inputs2.add("d"); - inputs2.add("e"); - inputs2.add("f"); - inputs2.add("g"); - inputs2.add("h"); - inputs2.add("i"); - - List<Long> timestamps1 = Arrays.asList(0L, 2L, 3L, 5L, 7L, 9L, 10L, 11L, 11L, 13L); - List<Long> timestamps2 = Arrays.asList(0L, 1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L); - - CoWindowReduceInvokable<Integer, String, String> invokable = new CoWindowReduceInvokable<Integer, String, String>( - new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new MyTimeStamp<Integer>(timestamps1), - new MyTimeStamp<String>(timestamps2)); - - List<String> expected = new ArrayList<String>(); - expected.add("6"); - expected.add("9"); - expected.add("30"); - expected.add("10"); - expected.add("abcde"); - expected.add("fghi"); - - List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2); - - Collections.sort(result); - Collections.sort(expected); - assertEquals(expected, result); - - } - - @Test - public void coWindowReduceTest2() { - - List<Integer> inputs = new ArrayList<Integer>(); - for (Integer i = 1; i <= 10; i++) { - inputs.add(i); - } - - List<String> inputs2 = new ArrayList<String>(); - inputs2.add("a"); - inputs2.add("b"); - inputs2.add("c"); - inputs2.add("d"); - inputs2.add("e"); - inputs2.add("f"); - inputs2.add("g"); - inputs2.add("h"); - inputs2.add("i"); - - List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 3L, 8L, 10L, 11L); - List<Long> timestamps2 = Arrays.asList(1L, 2L, 4L, 5L, 6L, 9L, 10L, 11L, 13L); - - CoWindowReduceInvokable<Integer, String, String> invokable = new CoWindowReduceInvokable<Integer, String, String>( - new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new MyTimeStamp<Integer>(timestamps1), - new MyTimeStamp<String>(timestamps2)); - - List<String> expected = new ArrayList<String>(); - expected.add("28"); - expected.add("18"); - expected.add("8"); - expected.add("27"); - expected.add("ab"); - expected.add("cd"); - expected.add("de"); - expected.add("f"); - expected.add("fgh"); - expected.add("hi"); - - List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2); - - Collections.sort(result); - Collections.sort(expected); - assertEquals(expected, result); - - } - -}
