http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
new file mode 100644
index 0000000..1bfd1d5
--- /dev/null
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import
org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(Parameterized.class)
+public class WindowOperatorTest {
+
+ @SuppressWarnings("unchecked,rawtypes")
+ private WindowBufferFactory windowBufferFactory;
+
+ public WindowOperatorTest(WindowBufferFactory<?, ?>
windowBufferFactory) {
+ this.windowBufferFactory = windowBufferFactory;
+ }
+
+ // For counting if close() is called the correct number of times on the
SumReducer
+ private static AtomicInteger closeCalled = new AtomicInteger(0);
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSlidingEventTimeWindows() throws Exception {
+ closeCalled.set(0);
+
+ final int WINDOW_SIZE = 3000;
+ final int WINDOW_SLIDE = 1000;
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>, TimeWindow> operator = new WindowOperator<>(
+ SlidingTimeWindows.of(WINDOW_SIZE,
WINDOW_SLIDE),
+ new TupleKeySelector(),
+ windowBufferFactory,
+ new ReduceWindowFunction<String, TimeWindow,
Tuple2<String, Integer>>(new SumReducer()),
+ WatermarkTrigger.create());
+
+ operator.setInputType(TypeInfoParser.<Tuple2<String,
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> testHarness =
+ new
OneInputStreamOperatorTestHarness<>(operator);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3999));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3000));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+
+
+ testHarness.processWatermark(new Watermark(initialTime + 999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3),
initialTime + 999));
+ expectedOutput.add(new Watermark(999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+ testHarness.processWatermark(new Watermark(initialTime + 1999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3),
initialTime + 1999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3),
initialTime + 1999));
+ expectedOutput.add(new Watermark(1999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3),
initialTime + 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3),
initialTime + 2999));
+ expectedOutput.add(new Watermark(2999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 3999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5),
initialTime + 3999));
+ expectedOutput.add(new Watermark(3999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 4999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2),
initialTime + 4999));
+ expectedOutput.add(new Watermark(4999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 5999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2),
initialTime + 5999));
+ expectedOutput.add(new Watermark(5999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+ // those don't have any effect...
+ testHarness.processWatermark(new Watermark(initialTime + 6999));
+ testHarness.processWatermark(new Watermark(initialTime + 7999));
+ expectedOutput.add(new Watermark(6999));
+ expectedOutput.add(new Watermark(7999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+ if (windowBufferFactory instanceof
PreAggregatingHeapWindowBuffer.Factory) {
+ Assert.assertEquals("Close was not called.", 2,
closeCalled.get());
+ } else {
+ Assert.assertEquals("Close was not called.", 1,
closeCalled.get());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTumblingEventTimeWindows() throws Exception {
+ closeCalled.set(0);
+
+ final int WINDOW_SIZE = 3000;
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>, TimeWindow> operator = new WindowOperator<>(
+ TumblingTimeWindows.of(WINDOW_SIZE),
+ new TupleKeySelector(),
+ windowBufferFactory,
+ new ReduceWindowFunction<String, TimeWindow,
Tuple2<String, Integer>>(new SumReducer()),
+ WatermarkTrigger.create());
+
+ operator.setInputType(TypeInfoParser.<Tuple2<String,
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> testHarness =
+ new
OneInputStreamOperatorTestHarness<>(operator);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3999));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3000));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+
+
+ testHarness.processWatermark(new Watermark(initialTime + 999));
+ expectedOutput.add(new Watermark(999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+ testHarness.processWatermark(new Watermark(initialTime + 1999));
+ expectedOutput.add(new Watermark(1999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3),
initialTime + 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3),
initialTime + 2999));
+ expectedOutput.add(new Watermark(2999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 3999));
+ expectedOutput.add(new Watermark(3999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 4999));
+ expectedOutput.add(new Watermark(4999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 5999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2),
initialTime + 5999));
+ expectedOutput.add(new Watermark(5999));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+ // those don't have any effect...
+ testHarness.processWatermark(new Watermark(initialTime + 6999));
+ testHarness.processWatermark(new Watermark(initialTime + 7999));
+ expectedOutput.add(new Watermark(6999));
+ expectedOutput.add(new Watermark(7999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+ if (windowBufferFactory instanceof
PreAggregatingHeapWindowBuffer.Factory) {
+ Assert.assertEquals("Close was not called.", 2,
closeCalled.get());
+ } else {
+ Assert.assertEquals("Close was not called.", 1,
closeCalled.get());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testContinuousWatermarkTrigger() throws Exception {
+ closeCalled.set(0);
+
+ final int WINDOW_SIZE = 3000;
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>, GlobalWindow> operator = new WindowOperator<>(
+ GlobalWindows.create(),
+ new TupleKeySelector(),
+ windowBufferFactory,
+ new ReduceWindowFunction<String, GlobalWindow,
Tuple2<String, Integer>>(new SumReducer()),
+ ContinuousWatermarkTrigger.of(WINDOW_SIZE));
+
+ operator.setInputType(TypeInfoParser.<Tuple2<String,
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> testHarness =
+ new
OneInputStreamOperatorTestHarness<>(operator);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // The global window actually ignores these timestamps...
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime));
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+
+
+ testHarness.processWatermark(new Watermark(initialTime + 1000));
+ expectedOutput.add(new Watermark(1000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+ testHarness.processWatermark(new Watermark(initialTime + 2000));
+ expectedOutput.add(new Watermark(2000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 3000));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3),
Long.MAX_VALUE));
+ expectedOutput.add(new Watermark(3000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 4000));
+ expectedOutput.add(new Watermark(4000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 5000));
+ expectedOutput.add(new Watermark(5000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processWatermark(new Watermark(initialTime + 6000));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3),
Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5),
Long.MAX_VALUE));
+ expectedOutput.add(new Watermark(6000));
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+ // those don't have any effect...
+ testHarness.processWatermark(new Watermark(initialTime + 7000));
+ testHarness.processWatermark(new Watermark(initialTime + 8000));
+ expectedOutput.add(new Watermark(7000));
+ expectedOutput.add(new Watermark(8000));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+ if (windowBufferFactory instanceof
PreAggregatingHeapWindowBuffer.Factory) {
+ Assert.assertEquals("Close was not called.", 2,
closeCalled.get());
+ } else {
+ Assert.assertEquals("Close was not called.", 1,
closeCalled.get());
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCountTrigger() throws Exception {
+ closeCalled.set(0);
+
+ final int WINDOW_SIZE = 4;
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>, GlobalWindow> operator = new WindowOperator<>(
+ GlobalWindows.create(),
+ new TupleKeySelector(),
+ windowBufferFactory,
+ new ReduceWindowFunction<String, GlobalWindow,
Tuple2<String, Integer>>(new SumReducer()),
+
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
+
+ operator.setInputType(TypeInfoParser.<Tuple2<String,
Integer>>parse(
+ "Tuple2<String, Integer>"), new
ExecutionConfig());
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> testHarness =
+ new
OneInputStreamOperatorTestHarness<>(operator);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ // The global window actually ignores these timestamps...
+
+ // add elements out-of-order
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3000));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 3999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 20));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1999));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4),
Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key1", 1), initialTime + 10999));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1000));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4),
Long.MAX_VALUE));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4),
Long.MAX_VALUE));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+ testHarness.close();
+ if (windowBufferFactory instanceof
PreAggregatingHeapWindowBuffer.Factory) {
+ Assert.assertEquals("Close was not called.", 2,
closeCalled.get());
+ } else {
+ Assert.assertEquals("Close was not called.", 1,
closeCalled.get());
+ }
+
+ }
+
+ //
------------------------------------------------------------------------
+ // UDFs
+ //
------------------------------------------------------------------------
+
+ public static class SumReducer extends
RichReduceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private boolean openCalled = false;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ openCalled = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ closeCalled.incrementAndGet();
+ }
+
+ @Override
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer>
value1,
+ Tuple2<String, Integer> value2) throws
Exception {
+ if (!openCalled) {
+ Assert.fail("Open was not called");
+ }
+ return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+ }
+ }
+ //
------------------------------------------------------------------------
+ // Parametrization for testing different window buffers
+ //
------------------------------------------------------------------------
+
+ @Parameterized.Parameters(name = "WindowBuffer = {0}")
+ @SuppressWarnings("unchecked,rawtypes")
+ public static Collection<WindowBufferFactory[]> windowBuffers(){
+ return Arrays.asList(new WindowBufferFactory[]{new
PreAggregatingHeapWindowBuffer.Factory(new SumReducer())},
+ new WindowBufferFactory[]{new
HeapWindowBuffer.Factory()}
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ private static class ResultSortComparator implements Comparator<Object>
{
+ @Override
+ public int compare(Object o1, Object o2) {
+ if (o1 instanceof Watermark || o2 instanceof Watermark)
{
+ return 0;
+ } else {
+ StreamRecord<Tuple2<String, Integer>> sr0 =
(StreamRecord<Tuple2<String, Integer>>) o1;
+ StreamRecord<Tuple2<String, Integer>> sr1 =
(StreamRecord<Tuple2<String, Integer>>) o2;
+ if (sr0.getTimestamp() != sr1.getTimestamp()) {
+ return (int) (sr0.getTimestamp() -
sr1.getTimestamp());
+ }
+ int comparison =
sr0.getValue().f0.compareTo(sr1.getValue().f0);
+ if (comparison != 0) {
+ return comparison;
+ } else {
+ return sr0.getValue().f1 -
sr1.getValue().f1;
+ }
+ }
+ }
+ }
+
+ private static class TupleKeySelector implements
KeySelector<Tuple2<String, Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Tuple2<String, Integer> value) throws
Exception {
+ return value.f0;
+ }
+ }
+}