Repository: flink Updated Branches: refs/heads/master 64e1dc6fe -> b8f58fab5
http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java new file mode 100644 index 0000000..464df32 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.operators.TriggerTimer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import org.junit.After; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@SuppressWarnings("serial") +public class AggregatingAlignedProcessingTimeWindowOperatorTest { + + @SuppressWarnings("unchecked") + private final ReduceFunction<String> mockFunction = mock(ReduceFunction.class); + + @SuppressWarnings("unchecked") + private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class); + + private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() { + @Override + public Integer getKey(Integer value) { + return value; + } + }; + + private final ReduceFunction<Integer> sumFunction = new ReduceFunction<Integer>() { + @Override + public Integer reduce(Integer value1, Integer value2) { + return value1 + value2; + } + }; + + // ------------------------------------------------------------------------ + + @After + public void checkNoTriggerThreadsRunning() { + // make sure that all the threads we trigger are shut down + long deadline = System.currentTimeMillis() + 5000; + while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { + try { + Thread.sleep(10); + } + catch (InterruptedException ignored) {} + } + + assertTrue("Not all trigger threads where properly shut down", + TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() == 0); + } + + // ------------------------------------------------------------------------ + + @Test + public void testInvalidParameters() { + try { + assertInvalidParameter(-1L, -1L); + assertInvalidParameter(10000L, -1L); + assertInvalidParameter(-1L, 1000L); + assertInvalidParameter(1000L, 2000L); + + // actual internal slide is too low here: + assertInvalidParameter(1000L, 999L); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testWindowSizeAndSlide() { + try { + AbstractAlignedProcessingTimeWindowOperator<String, String, String> op; + + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000); + assertEquals(5000, op.getWindowSize()); + assertEquals(1000, op.getWindowSlide()); + assertEquals(1000, op.getPaneSize()); + assertEquals(5, op.getNumPanesPerWindow()); + + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000); + assertEquals(1000, op.getWindowSize()); + assertEquals(1000, op.getWindowSlide()); + assertEquals(1000, op.getPaneSize()); + assertEquals(1, op.getNumPanesPerWindow()); + + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000); + assertEquals(1500, op.getWindowSize()); + assertEquals(1000, op.getWindowSlide()); + assertEquals(500, op.getPaneSize()); + assertEquals(3, op.getNumPanesPerWindow()); + + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100); + assertEquals(1200, op.getWindowSize()); + assertEquals(1100, op.getWindowSlide()); + assertEquals(100, op.getPaneSize()); + assertEquals(12, op.getNumPanesPerWindow()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testWindowTriggerTimeAlignment() { + try { + @SuppressWarnings("unchecked") + final Output<StreamRecord<String>> mockOut = mock(Output.class); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + AbstractAlignedProcessingTimeWindowOperator<String, String, String> op; + + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000); + op.setup(mockOut, mockContext); + op.open(new Configuration()); + assertTrue(op.getNextSlideTime() % 1000 == 0); + assertTrue(op.getNextEvaluationTime() % 1000 == 0); + op.dispose(); + + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000); + op.setup(mockOut, mockContext); + op.open(new Configuration()); + assertTrue(op.getNextSlideTime() % 1000 == 0); + assertTrue(op.getNextEvaluationTime() % 1000 == 0); + op.dispose(); + + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000); + op.setup(mockOut, mockContext); + op.open(new Configuration()); + assertTrue(op.getNextSlideTime() % 500 == 0); + assertTrue(op.getNextEvaluationTime() % 1000 == 0); + op.dispose(); + + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100); + op.setup(mockOut, mockContext); + op.open(new Configuration()); + assertTrue(op.getNextSlideTime() % 100 == 0); + assertTrue(op.getNextEvaluationTime() % 1100 == 0); + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testTumblingWindowUniqueElements() { + try { + final int windowSize = 50; + final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + AggregatingProcessingTimeWindowOperator<Integer, Integer> op = + new AggregatingProcessingTimeWindowOperator<>( + sumFunction, identitySelector, windowSize, windowSize); + + op.setup(out, mockContext); + op.open(new Configuration()); + + final int numElements = 1000; + + for (int i = 0; i < numElements; i++) { + op.processElement(new StreamRecord<Integer>(i)); + Thread.sleep(1); + } + + op.close(); + op.dispose(); + + // get and verify the result + List<Integer> result = out.getElements(); + assertEquals(numElements, result.size()); + + Collections.sort(result); + for (int i = 0; i < numElements; i++) { + assertEquals(i, result.get(i).intValue()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testTumblingWindowDuplicateElements() { + try { + final int windowSize = 50; + final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + AggregatingProcessingTimeWindowOperator<Integer, Integer> op = + new AggregatingProcessingTimeWindowOperator<>( + sumFunction, identitySelector, windowSize, windowSize); + + op.setup(out, mockContext); + op.open(new Configuration()); + + final int numWindows = 10; + + long previousNextTime = 0; + int window = 1; + + while (window <= numWindows) { + long nextTime = op.getNextEvaluationTime(); + int val = ((int) nextTime) ^ ((int) (nextTime >>> 32)); + + op.processElement(new StreamRecord<Integer>(val)); + + if (nextTime != previousNextTime) { + window++; + previousNextTime = nextTime; + } + + Thread.sleep(1); + } + + op.close(); + op.dispose(); + + List<Integer> result = out.getElements(); + + // we have ideally one element per window. we may have more, when we emitted a value into the + // successive window (corner case), so we can have twice the number of elements, in the worst case. + assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows); + + // deduplicate for more accurate checks + HashSet<Integer> set = new HashSet<>(result); + assertTrue(set.size() == 10 || set.size() == 11); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSlidingWindow() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(50); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + // tumbling window that triggers every 20 milliseconds + AggregatingProcessingTimeWindowOperator<Integer, Integer> op = + new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50); + + op.setup(out, mockContext); + op.open(new Configuration()); + + final int numElements = 1000; + + for (int i = 0; i < numElements; i++) { + op.processElement(new StreamRecord<Integer>(i)); + Thread.sleep(1); + } + + op.close(); + op.dispose(); + + // get and verify the result + List<Integer> result = out.getElements(); + + // every element can occur between one and three times + if (result.size() < numElements || result.size() > 3 * numElements) { + System.out.println(result); + fail("Wrong number of results: " + result.size()); + } + + Collections.sort(result); + int lastNum = -1; + int lastCount = -1; + + for (int num : result) { + if (num == lastNum) { + lastCount++; + assertTrue(lastCount <= 3); + } + else { + lastNum = num; + lastCount = 1; + } + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSlidingWindowSingleElements() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(50); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + // tumbling window that triggers every 20 milliseconds + AggregatingProcessingTimeWindowOperator<Integer, Integer> op = + new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50); + + op.setup(out, mockContext); + op.open(new Configuration()); + + op.processElement(new StreamRecord<Integer>(1)); + op.processElement(new StreamRecord<Integer>(2)); + + // each element should end up in the output three times + // wait until the elements have arrived 6 times in the output + out.waitForNElements(6, 120000); + + List<Integer> result = out.getElements(); + assertEquals(6, result.size()); + + Collections.sort(result); + assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result); + + op.close(); + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testEmitTrailingDataOnClose() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + // the operator has a window time that is so long that it will not fire in this test + final long oneYear = 365L * 24 * 60 * 60 * 1000; + AggregatingProcessingTimeWindowOperator<Integer, Integer> op = + new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, oneYear, oneYear); + + op.setup(out, mockContext); + op.open(new Configuration()); + + List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + for (Integer i : data) { + op.processElement(new StreamRecord<Integer>(i)); + } + + op.close(); + op.dispose(); + + // get and verify the result + List<Integer> result = out.getElements(); + Collections.sort(result); + assertEquals(data, result); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPropagateExceptionsFromTrigger() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + ReduceFunction<Integer> failingFunction = new FailingFunction(100); + + AggregatingProcessingTimeWindowOperator<Integer, Integer> op = + new AggregatingProcessingTimeWindowOperator<>(failingFunction, identitySelector, 200, 50); + + op.setup(out, mockContext); + op.open(new Configuration()); + + try { + long nextWindowTime = op.getNextEvaluationTime(); + int val = 0; + for (int num = 0; num < Integer.MAX_VALUE; num++) { + op.processElement(new StreamRecord<Integer>(val++)); + Thread.sleep(1); + + // when the window has advanced, reset the value, to generate the same values + // in the next pane again. This causes the aggregation on trigger to reduce values + if (op.getNextEvaluationTime() != nextWindowTime) { + nextWindowTime = op.getNextEvaluationTime(); + val = 0; + } + } + fail("This should really have failed with an exception quite a while ago..."); + } + catch (Exception e) { + assertNotNull(e.getCause()); + assertTrue(e.getCause().getMessage().contains("Artificial Test Exception")); + } + + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPropagateExceptionsFromProcessElement() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + ReduceFunction<Integer> failingFunction = new FailingFunction(100); + + // the operator has a window time that is so long that it will not fire in this test + final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000; + AggregatingProcessingTimeWindowOperator<Integer, Integer> op = + new AggregatingProcessingTimeWindowOperator<>( + failingFunction, identitySelector, hundredYears, hundredYears); + + op.setup(out, mockContext); + op.open(new Configuration()); + + for (int i = 0; i < 100; i++) { + op.processElement(new StreamRecord<Integer>(1)); + } + + try { + op.processElement(new StreamRecord<Integer>(1)); + fail("This fail with an exception"); + } + catch (Exception e) { + assertTrue(e.getMessage().contains("Artificial Test Exception")); + } + + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + + private void assertInvalidParameter(long windowSize, long windowSlide) { + try { + new AggregatingProcessingTimeWindowOperator<String, String>( + mockFunction, mockKeySelector, windowSize, windowSlide); + fail("This should fail with an IllegalArgumentException"); + } + catch (IllegalArgumentException e) { + // expected + } + catch (Exception e) { + fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName()); + } + } + + // ------------------------------------------------------------------------ + + private static class FailingFunction implements ReduceFunction<Integer> { + + private final int failAfterElements; + + private int numElements; + + FailingFunction(int failAfterElements) { + this.failAfterElements = failAfterElements; + } + + @Override + public Integer reduce(Integer value1, Integer value2) throws Exception { + numElements++; + + if (numElements >= failAfterElements) { + throw new Exception("Artificial Test Exception"); + } + + return value1 + value2; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java new file mode 100644 index 0000000..9f6858d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; + +public class CollectingOutput<T> implements Output<StreamRecord<T>> { + + private final List<T> elements = new ArrayList<>(); + + private final int timeStampModulus; + + + public CollectingOutput() { + this.timeStampModulus = 0; + } + + public CollectingOutput(int timeStampModulus) { + this.timeStampModulus = timeStampModulus; + } + + // ------------------------------------------------------------------------ + + public List<T> getElements() { + return elements; + } + + public void waitForNElements(int n, long timeout) throws InterruptedException { + long deadline = System.currentTimeMillis() + timeout; + synchronized (elements) { + long now; + while (elements.size() < n && (now = System.currentTimeMillis()) < deadline) { + elements.wait(deadline - now); + } + } + } + + // ------------------------------------------------------------------------ + + @Override + public void emitWatermark(Watermark mark) { + throw new UnsupportedOperationException("the output should not emit watermarks"); + } + + @Override + public void collect(StreamRecord<T> record) { + elements.add(record.getValue()); + + if (timeStampModulus != 0 && record.getTimestamp() % timeStampModulus != 0) { + throw new IllegalArgumentException("Invalid timestamp"); + } + synchronized (elements) { + elements.notifyAll(); + } + } + + @Override + public void close() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java new file mode 100644 index 0000000..2a9e203 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class KeyMapPutIfAbsentTest { + + @Test + public void testPutIfAbsentUniqueKeysAndGrowth() { + try { + KeyMap<Integer, Integer> map = new KeyMap<>(); + IntegerFactory factory = new IntegerFactory(); + + final int numElements = 1000000; + + for (int i = 0; i < numElements; i++) { + factory.set(2 * i + 1); + map.putIfAbsent(i, factory); + + assertEquals(i+1, map.size()); + assertTrue(map.getCurrentTableCapacity() > map.size()); + assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold()); + assertTrue(map.size() <= map.getRehashThreshold()); + } + + assertEquals(numElements, map.size()); + assertEquals(numElements, map.traverseAndCountElements()); + assertEquals(1 << 21, map.getCurrentTableCapacity()); + + for (int i = 0; i < numElements; i++) { + assertEquals(2 * i + 1, map.get(i).intValue()); + } + + for (int i = numElements - 1; i >= 0; i--) { + assertEquals(2 * i + 1, map.get(i).intValue()); + } + + assertEquals(numElements, map.size()); + assertEquals(numElements, map.traverseAndCountElements()); + assertEquals(1 << 21, map.getCurrentTableCapacity()); + assertTrue(map.getLongestChainLength() <= 7); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPutIfAbsentDuplicateKeysAndGrowth() { + try { + KeyMap<Integer, Integer> map = new KeyMap<>(); + IntegerFactory factory = new IntegerFactory(); + + final int numElements = 1000000; + + for (int i = 0; i < numElements; i++) { + int val = 2 * i + 1; + factory.set(val); + Integer put = map.putIfAbsent(i, factory); + assertEquals(val, put.intValue()); + } + + for (int i = 0; i < numElements; i += 3) { + factory.set(2 * i); + Integer put = map.putIfAbsent(i, factory); + assertEquals(2 * i + 1, put.intValue()); + } + + for (int i = 0; i < numElements; i++) { + assertEquals(2 * i + 1, map.get(i).intValue()); + } + + assertEquals(numElements, map.size()); + assertEquals(numElements, map.traverseAndCountElements()); + assertEquals(1 << 21, map.getCurrentTableCapacity()); + assertTrue(map.getLongestChainLength() <= 7); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + + private static class IntegerFactory implements KeyMap.LazyFactory<Integer> { + + private Integer toCreate; + + public void set(Integer toCreate) { + this.toCreate = toCreate; + } + + @Override + public Integer create() { + return toCreate; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java new file mode 100644 index 0000000..7335976 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.junit.Test; + +import java.util.BitSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class KeyMapPutTest { + + @Test + public void testPutUniqueKeysAndGrowth() { + try { + KeyMap<Integer, Integer> map = new KeyMap<>(); + + final int numElements = 1000000; + + for (int i = 0; i < numElements; i++) { + map.put(i, 2 * i + 1); + + assertEquals(i+1, map.size()); + assertTrue(map.getCurrentTableCapacity() > map.size()); + assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold()); + assertTrue(map.size() <= map.getRehashThreshold()); + } + + assertEquals(numElements, map.size()); + assertEquals(numElements, map.traverseAndCountElements()); + assertEquals(1 << 21, map.getCurrentTableCapacity()); + + for (int i = 0; i < numElements; i++) { + assertEquals(2 * i + 1, map.get(i).intValue()); + } + + for (int i = numElements - 1; i >= 0; i--) { + assertEquals(2 * i + 1, map.get(i).intValue()); + } + + BitSet bitset = new BitSet(); + int numContained = 0; + for (KeyMap.Entry<Integer, Integer> entry : map) { + numContained++; + + assertEquals(entry.getKey() * 2 + 1, entry.getValue().intValue()); + assertFalse(bitset.get(entry.getKey())); + bitset.set(entry.getKey()); + } + + assertEquals(numElements, numContained); + assertEquals(numElements, bitset.cardinality()); + + + assertEquals(numElements, map.size()); + assertEquals(numElements, map.traverseAndCountElements()); + assertEquals(1 << 21, map.getCurrentTableCapacity()); + assertTrue(map.getLongestChainLength() <= 7); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPutDuplicateKeysAndGrowth() { + try { + final KeyMap<Integer, Integer> map = new KeyMap<>(); + final int numElements = 1000000; + + for (int i = 0; i < numElements; i++) { + Integer put = map.put(i, 2*i+1); + assertNull(put); + } + + for (int i = 0; i < numElements; i += 3) { + Integer put = map.put(i, 2*i); + assertNotNull(put); + assertEquals(2*i+1, put.intValue()); + } + + for (int i = 0; i < numElements; i++) { + int expected = (i % 3 == 0) ? (2*i) : (2*i+1); + assertEquals(expected, map.get(i).intValue()); + } + + assertEquals(numElements, map.size()); + assertEquals(numElements, map.traverseAndCountElements()); + assertEquals(1 << 21, map.getCurrentTableCapacity()); + assertTrue(map.getLongestChainLength() <= 7); + + + BitSet bitset = new BitSet(); + int numContained = 0; + for (KeyMap.Entry<Integer, Integer> entry : map) { + numContained++; + + int key = entry.getKey(); + int expected = key % 3 == 0 ? (2*key) : (2*key+1); + + assertEquals(expected, entry.getValue().intValue()); + assertFalse(bitset.get(key)); + bitset.set(key); + } + + assertEquals(numElements, numContained); + assertEquals(numElements, bitset.cardinality()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java new file mode 100644 index 0000000..be71af2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Random; + +import static org.junit.Assert.*; + +public class KeyMapTest { + + @Test + public void testInitialSizeComputation() { + try { + KeyMap<String, String> map; + + map = new KeyMap<>(); + assertEquals(64, map.getCurrentTableCapacity()); + assertEquals(6, map.getLog2TableCapacity()); + assertEquals(24, map.getShift()); + assertEquals(48, map.getRehashThreshold()); + + map = new KeyMap<>(0); + assertEquals(64, map.getCurrentTableCapacity()); + assertEquals(6, map.getLog2TableCapacity()); + assertEquals(24, map.getShift()); + assertEquals(48, map.getRehashThreshold()); + + map = new KeyMap<>(1); + assertEquals(64, map.getCurrentTableCapacity()); + assertEquals(6, map.getLog2TableCapacity()); + assertEquals(24, map.getShift()); + assertEquals(48, map.getRehashThreshold()); + + map = new KeyMap<>(9); + assertEquals(64, map.getCurrentTableCapacity()); + assertEquals(6, map.getLog2TableCapacity()); + assertEquals(24, map.getShift()); + assertEquals(48, map.getRehashThreshold()); + + map = new KeyMap<>(63); + assertEquals(64, map.getCurrentTableCapacity()); + assertEquals(6, map.getLog2TableCapacity()); + assertEquals(24, map.getShift()); + assertEquals(48, map.getRehashThreshold()); + + map = new KeyMap<>(64); + assertEquals(128, map.getCurrentTableCapacity()); + assertEquals(7, map.getLog2TableCapacity()); + assertEquals(23, map.getShift()); + assertEquals(96, map.getRehashThreshold()); + + map = new KeyMap<>(500); + assertEquals(512, map.getCurrentTableCapacity()); + assertEquals(9, map.getLog2TableCapacity()); + assertEquals(21, map.getShift()); + assertEquals(384, map.getRehashThreshold()); + + map = new KeyMap<>(127); + assertEquals(128, map.getCurrentTableCapacity()); + assertEquals(7, map.getLog2TableCapacity()); + assertEquals(23, map.getShift()); + assertEquals(96, map.getRehashThreshold()); + + // no negative number of elements + try { + new KeyMap<>(-1); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + // check integer overflow + try { + map = new KeyMap<>(0x65715522); + + final int maxCap = Integer.highestOneBit(Integer.MAX_VALUE); + assertEquals(Integer.highestOneBit(Integer.MAX_VALUE), map.getCurrentTableCapacity()); + assertEquals(30, map.getLog2TableCapacity()); + assertEquals(0, map.getShift()); + assertEquals(maxCap / 4 * 3, map.getRehashThreshold()); + } + catch (OutOfMemoryError e) { + // this may indeed happen in small test setups. we tolerate this in this test + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPutAndGetRandom() { + try { + final KeyMap<Integer, Integer> map = new KeyMap<>(); + final Random rnd = new Random(); + + final long seed = rnd.nextLong(); + final int numElements = 10000; + + final HashMap<Integer, Integer> groundTruth = new HashMap<>(); + + rnd.setSeed(seed); + for (int i = 0; i < numElements; i++) { + Integer key = rnd.nextInt(); + Integer value = rnd.nextInt(); + + if (rnd.nextBoolean()) { + groundTruth.put(key, value); + map.put(key, value); + } + } + + rnd.setSeed(seed); + for (int i = 0; i < numElements; i++) { + Integer key = rnd.nextInt(); + + // skip these, evaluating it is tricky due to duplicates + rnd.nextInt(); + rnd.nextBoolean(); + + Integer expected = groundTruth.get(key); + if (expected == null) { + assertNull(map.get(key)); + } + else { + Integer contained = map.get(key); + assertNotNull(contained); + assertEquals(expected, contained); + } + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testConjunctTraversal() { + try { + final Random rootRnd = new Random(654685486325439L); + + final int numMaps = 7; + final int numKeys = 1000000; + + // ------ create a set of maps ------ + @SuppressWarnings("unchecked") + final KeyMap<Integer, Integer>[] maps = (KeyMap<Integer, Integer>[]) new KeyMap<?, ?>[numMaps]; + for (int i = 0; i < numMaps; i++) { + maps[i] = new KeyMap<>(); + } + + // ------ prepare probabilities for maps ------ + final double[] probabilities = new double[numMaps]; + final double[] probabilitiesTemp = new double[numMaps]; + { + probabilities[0] = 0.5; + double remainingProb = 1.0 - probabilities[0]; + for (int i = 1; i < numMaps - 1; i++) { + remainingProb /= 2; + probabilities[i] = remainingProb; + } + + // compensate for rounding errors + probabilities[numMaps - 1] = remainingProb; + } + + // ------ generate random elements ------ + final long probSeed = rootRnd.nextLong(); + final long keySeed = rootRnd.nextLong(); + + final Random probRnd = new Random(probSeed); + final Random keyRnd = new Random(keySeed); + + final int maxStride = Integer.MAX_VALUE / numKeys; + + int totalNumElements = 0; + int nextKeyValue = 1; + + for (int i = 0; i < numKeys; i++) { + int numCopies = (nextKeyValue % 3) + 1; + System.arraycopy(probabilities, 0, probabilitiesTemp, 0, numMaps); + + double totalProb = 1.0; + for (int copy = 0; copy < numCopies; copy++) { + int pos = drawPosProportionally(probabilitiesTemp, totalProb, probRnd); + totalProb -= probabilitiesTemp[pos]; + probabilitiesTemp[pos] = 0.0; + + Integer boxed = nextKeyValue; + Integer previous = maps[pos].put(boxed, boxed); + assertNull("Test problem - test does not assign unique maps", previous); + } + + totalNumElements += numCopies; + nextKeyValue += keyRnd.nextInt(maxStride) + 1; + } + + + // check that all maps contain the total number of elements + int numContained = 0; + for (KeyMap<?, ?> map : maps) { + numContained += map.size(); + } + assertEquals(totalNumElements, numContained); + + // ------ check that all elements can be found in the maps ------ + keyRnd.setSeed(keySeed); + + numContained = 0; + nextKeyValue = 1; + for (int i = 0; i < numKeys; i++) { + int numCopiesExpected = (nextKeyValue % 3) + 1; + int numCopiesContained = 0; + + for (KeyMap<Integer, Integer> map : maps) { + Integer val = map.get(nextKeyValue); + if (val != null) { + assertEquals(nextKeyValue, val.intValue()); + numCopiesContained++; + } + } + + assertEquals(numCopiesExpected, numCopiesContained); + numContained += numCopiesContained; + + nextKeyValue += keyRnd.nextInt(maxStride) + 1; + } + assertEquals(totalNumElements, numContained); + + // ------ make a traversal over all keys and validate the keys in the traversal ------ + final int[] keysStartedAndFinished = { 0, 0 }; + KeyMap.TraversalEvaluator<Integer, Integer> traversal = new KeyMap.TraversalEvaluator<Integer, Integer>() { + + private int key; + private int valueCount; + + @Override + public void startNewKey(Integer key) { + this.key = key; + this.valueCount = 0; + + keysStartedAndFinished[0]++; + } + + @Override + public void nextValue(Integer value) { + assertEquals(this.key, value.intValue()); + this.valueCount++; + } + + @Override + public void keyDone() { + int expected = (key % 3) + 1; + if (expected != valueCount) { + fail("Wrong count for key " + key + " ; expected=" + expected + " , count=" + valueCount); + } + + keysStartedAndFinished[1]++; + } + }; + + KeyMap.traverseMaps(shuffleArray(maps, rootRnd), traversal, 17); + + assertEquals(numKeys, keysStartedAndFinished[0]); + assertEquals(numKeys, keysStartedAndFinished[1]); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSizeComparator() { + try { + KeyMap<String, String> map1 = new KeyMap<>(5); + KeyMap<String, String> map2 = new KeyMap<>(80); + + assertTrue(map1.getCurrentTableCapacity() < map2.getCurrentTableCapacity()); + + assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map1) == 0); + assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map2) == 0); + assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map2) > 0); + assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map1) < 0); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + + private static int drawPosProportionally(double[] array, double totalProbability, Random rnd) { + double val = rnd.nextDouble() * totalProbability; + + double accum = 0; + for (int i = 0; i < array.length; i++) { + accum += array[i]; + if (val <= accum && array[i] > 0.0) { + return i; + } + } + + // in case of rounding errors + return array.length - 1; + } + + private static <E> E[] shuffleArray(E[] array, Random rnd) { + E[] target = Arrays.copyOf(array, array.length); + + for (int i = target.length - 1; i > 0; i--) { + int swapPos = rnd.nextInt(i + 1); + E temp = target[i]; + target[i] = target[swapPos]; + target[swapPos] = temp; + } + + return target; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java new file mode 100644 index 0000000..6e0724d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.runtime.operators.windows.AggregatingProcessingTimeWindowOperator; +import org.apache.flink.util.Collector; + +@SuppressWarnings("serial") +public class GroupedProcessingTimeWindowExample { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + DataStream<Tuple2<Long, Long>> stream = env + .addSource(new RichParallelSourceFunction<Tuple2<Long, Long>>() { + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + final long startTime = System.currentTimeMillis(); + + final long numElements = 20000000; + final long numKeys = 10000; + long val = 1L; + long count = 0L; + + + while (running && count < numElements) { + count++; + ctx.collect(new Tuple2<Long, Long>(val++, 1L)); + + if (val > numKeys) { + val = 1L; + } + } + + final long endTime = System.currentTimeMillis(); + System.out.println("Took " + (endTime-startTime) + " msecs for " + numElements + " values"); + } + + @Override + public void cancel() { + running = false; + } + }); + + stream +// .groupBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) +// .window(Time.of(2500, TimeUnit.MILLISECONDS)).every(Time.of(500, TimeUnit.MILLISECONDS)) +// .reduceWindow(new SummingReducer()) +// .flatten() +// .partitionByHash(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) +// .transform( +// "Aligned time window", +// TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"), +// new AccumulatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>, Tuple2<Long, Long>>( +// new SummingWindowFunction<Long>(), +// new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(), +// 2500, 500)) + .transform( + "Aligned time window", + TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"), + new AggregatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>>( + new SummingReducer(), + new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(), + 2500, 500)) + + .addSink(new SinkFunction<Tuple2<Long, Long>>() { + @Override + public void invoke(Tuple2<Long, Long> value) { + } + }); + + env.execute(); + } + + public static class FirstFieldKeyExtractor<Type extends Tuple, Key> implements KeySelector<Type, Key> { + + @Override + @SuppressWarnings("unchecked") + public Key getKey(Type value) { + return (Key) value.getField(0); + } + } + + public static class IdentityKeyExtractor<T> implements KeySelector<T, T> { + + @Override + public T getKey(T value) { + return value; + } + } + + public static class IdentityWindowFunction<K, T> implements KeyedWindowFunction<K, T, T> { + + @Override + public void evaluate(K k, Iterable<T> values, Collector<T> out) throws Exception { + for (T v : values) { + out.collect(v); + } + } + } + + public static class CountingWindowFunction<K, T> implements KeyedWindowFunction<K, T, Long> { + + @Override + public void evaluate(K k, Iterable<T> values, Collector<Long> out) throws Exception { + long count = 0; + for (T ignored : values) { + count++; + } + + out.collect(count); + } + } + + public static class SummingWindowFunction<K> implements KeyedWindowFunction<K, Tuple2<K, Long>, Tuple2<K, Long>> { + + @Override + public void evaluate(K key, Iterable<Tuple2<K, Long>> values, Collector<Tuple2<K, Long>> out) throws Exception { + long sum = 0L; + for (Tuple2<K, Long> value : values) { + sum += value.f1; + } + + out.collect(new Tuple2<K, Long>(key, sum)); + } + } + + public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>> { + + @Override + public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) { + return new Tuple2<>(value1.f0, value1.f1 + value2.f1); + } + } +}