[FLINK-2683] [FLINK-2682] [runtime] Add dedicated operator for aligned processing time windows.
Also add utilities for heap-backed keyed state in panes (dedicated tailored hash table) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f58fab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f58fab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f58fab Branch: refs/heads/master Commit: b8f58fab56c727b8d44d2a90bbb389ab7544989e Parents: 7ad2103 Author: Stephan Ewen <se...@apache.org> Authored: Tue Sep 15 15:00:17 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Sep 22 13:25:20 2015 +0200 ---------------------------------------------------------------------- .../flink/api/common/functions/Function.java | 2 +- .../flink/runtime/util/UnionIterator.java | 100 +++ .../flink/runtime/util/UnionIteratorTest.java | 142 ++++ .../functions/windows/KeyedWindowFunction.java | 44 ++ .../api/operators/TimestampedCollector.java | 6 +- .../operators/StreamingOperatorMetrics.java | 27 + ...ractAlignedProcessingTimeWindowOperator.java | 270 ++++++++ .../windows/AbstractKeyedTimePanes.java | 76 +++ .../windows/AccumulatingKeyedTimePanes.java | 126 ++++ ...ccumulatingProcessingTimeWindowOperator.java | 48 ++ .../windows/AggregatingKeyedTimePanes.java | 103 +++ ...AggregatingProcessingTimeWindowOperator.java | 47 ++ .../runtime/operators/windows/KeyMap.java | 651 +++++++++++++++++++ .../runtime/operators/windows/package-info.java | 22 + .../runtime/tasks/StreamTaskException.java | 2 +- .../streaming/runtime/tasks/package-info.java | 27 + ...AlignedProcessingTimeWindowOperatorTest.java | 515 +++++++++++++++ ...AlignedProcessingTimeWindowOperatorTest.java | 528 +++++++++++++++ .../operators/windows/CollectingOutput.java | 80 +++ .../windows/KeyMapPutIfAbsentTest.java | 121 ++++ .../operators/windows/KeyMapPutTest.java | 136 ++++ .../runtime/operators/windows/KeyMapTest.java | 344 ++++++++++ .../GroupedProcessingTimeWindowExample.java | 167 +++++ 23 files changed, 3581 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java index 632a0d2..8a1819c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java @@ -24,5 +24,5 @@ package org.apache.flink.api.common.functions; * <p>This interface is empty in order to allow extending interfaces to * be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.</p> */ -public interface Function { +public interface Function extends java.io.Serializable { } http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java new file mode 100644 index 0000000..c279adf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/UnionIterator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.util.TraversableOnceException; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public class UnionIterator<T> implements Iterator<T>, Iterable<T> { + + private Iterator<T> currentIterator; + + private ArrayList<List<T>> furtherLists = new ArrayList<>(); + + private int nextList; + + private boolean iteratorAvailable = true; + + // ------------------------------------------------------------------------ + + public void clear() { + currentIterator = null; + furtherLists.clear(); + nextList = 0; + iteratorAvailable = true; + } + + public void addList(List<T> list) { + if (currentIterator == null) { + currentIterator = list.iterator(); + } + else { + furtherLists.add(list); + } + } + + // ------------------------------------------------------------------------ + + @Override + public Iterator<T> iterator() { + if (iteratorAvailable) { + iteratorAvailable = false; + return this; + } else { + throw new TraversableOnceException(); + } + } + + @Override + public boolean hasNext() { + while (currentIterator != null) { + if (currentIterator.hasNext()) { + return true; + } + else if (nextList < furtherLists.size()) { + currentIterator = furtherLists.get(nextList).iterator(); + nextList++; + } + else { + currentIterator = null; + } + } + + return false; + } + + @Override + public T next() { + if (hasNext()) { + return currentIterator.next(); + } + else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-runtime/src/test/java/org/apache/flink/runtime/util/UnionIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/UnionIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/UnionIteratorTest.java new file mode 100644 index 0000000..2a79b1f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/UnionIteratorTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.util.TraversableOnceException; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.junit.Assert.*; + +public class UnionIteratorTest { + + @Test + public void testUnion() { + try { + UnionIterator<Integer> iter = new UnionIterator<>(); + + // should succeed and be empty + assertFalse(iter.iterator().hasNext()); + + iter.clear(); + + try { + iter.iterator().next(); + fail("should fail with an exception"); + } catch (NoSuchElementException e) { + // expected + } + + iter.clear(); + iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + iter.addList(Collections.<Integer>emptyList()); + iter.addList(Arrays.asList(8, 9, 10, 11)); + + int val = 1; + for (int i : iter) { + assertEquals(val++, i); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testTraversableOnce() { + try { + UnionIterator<Integer> iter = new UnionIterator<>(); + + // should succeed + iter.iterator(); + + // should fail + try { + iter.iterator(); + fail("should fail with an exception"); + } catch (TraversableOnceException e) { + // expected + } + + // should fail again + try { + iter.iterator(); + fail("should fail with an exception"); + } catch (TraversableOnceException e) { + // expected + } + + // reset the thing, keep it empty + iter.clear(); + + // should succeed + iter.iterator(); + + // should fail + try { + iter.iterator(); + fail("should fail with an exception"); + } catch (TraversableOnceException e) { + // expected + } + + // should fail again + try { + iter.iterator(); + fail("should fail with an exception"); + } catch (TraversableOnceException e) { + // expected + } + + // reset the thing, add some data + iter.clear(); + iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + + // should succeed + Iterator<Integer> ints = iter.iterator(); + assertNotNull(ints.next()); + assertNotNull(ints.next()); + assertNotNull(ints.next()); + + // should fail if called in the middle of operations + try { + iter.iterator(); + fail("should fail with an exception"); + } catch (TraversableOnceException e) { + // expected + } + + // reset the thing, keep it empty + iter.clear(); + + // should succeed again + assertFalse(iter.iterator().hasNext()); + + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java new file mode 100644 index 0000000..d7ca0a1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windows; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Base interface for functions that are evaluated over keyed (grouped) windows. + * + * @param <KEY> The type of the key. + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + */ +public interface KeyedWindowFunction<KEY, IN, OUT> extends Function, Serializable { + + /** + * + * @param key + * @param values + * @param out + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + void evaluate(KEY key, Iterable<IN> values, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java index 0ff223c..62514fc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java @@ -30,9 +30,13 @@ import org.joda.time.Instant; * @param <T> The type of the elments that can be emitted. */ public class TimestampedCollector<T> implements Collector<T> { + private final Output<StreamRecord<T>> output; + + private final StreamRecord<T> reuse; + private long timestamp; - private StreamRecord<T> reuse; + /** * Creates a new {@link TimestampedCollector} that wraps the given {@link Output}. http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java new file mode 100644 index 0000000..8429889 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +public class StreamingOperatorMetrics { + + + public void incrementLateElementDiscarded() { + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java new file mode 100644 index 0000000..f5f576d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.commons.math3.util.ArithmeticUtils; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.TriggerTimer; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + + +public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT>, Triggerable { + + private static final long serialVersionUID = 3245500864882459867L; + + private static final long MIN_SLIDE_TIME = 50; + + + private final SerializableObject lock = new SerializableObject(); + + // ----- fields for operator parametrization ----- + + private final Function function; + private final KeySelector<IN, KEY> keySelector; + + private final long windowSize; + private final long windowSlide; + private final long paneSize; + private final int numPanesPerWindow; + + // ----- fields for operator functionality ----- + + private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes; + + private transient TimestampedCollector<OUT> out; + + private transient TriggerTimer triggerTimer; + + private transient long nextEvaluationTime; + private transient long nextSlideTime; + + private transient volatile Throwable asyncError; + + + protected AbstractAlignedProcessingTimeWindowOperator( + Function function, + KeySelector<IN, KEY> keySelector, + long windowLength, + long windowSlide) + { + if (function == null || keySelector == null) { + throw new NullPointerException(); + } + if (windowLength < MIN_SLIDE_TIME) { + throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs"); + } + if (windowSlide < MIN_SLIDE_TIME) { + throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs"); + } + if (windowLength < windowSlide) { + throw new IllegalArgumentException("The window size must be larger than the window slide"); + } + + final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide); + if (paneSlide < MIN_SLIDE_TIME) { + throw new IllegalArgumentException(String.format( + "Cannot compute window of size %d msecs sliding by %d msecs. " + + "The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide)); + } + + this.function = function; + this.keySelector = keySelector; + this.windowSize = windowLength; + this.windowSlide = windowSlide; + this.paneSize = paneSlide; + this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide); + } + + + protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes( + KeySelector<IN, KEY> keySelector, Function function); + + // ------------------------------------------------------------------------ + // startup and shutdown + // ------------------------------------------------------------------------ + + @Override + public void open(Configuration parameters) throws Exception { + out = new TimestampedCollector<>(output); + + // create the panes that gather the elements per slide + panes = createPanes(keySelector, function); + + // decide when to first compute the window and when to slide it + // the values should align with the start of time (that is, the UNIX epoch, not the big bang) + final long now = System.currentTimeMillis(); + nextEvaluationTime = now + windowSlide - (now % windowSlide); + nextSlideTime = now + paneSize - (now % paneSize); + + // start the trigger timer + triggerTimer = new TriggerTimer("Trigger for " + getRuntimeContext().getTaskName()); + + // schedule the first trigger + triggerTimer.scheduleTriggerAt(this, Math.min(nextEvaluationTime, nextSlideTime)); + } + + @Override + public void close() throws Exception { + // acquire the lock during shutdown, to prevent trigger calls at the same time + synchronized (lock) { + final long finalWindowTimestamp = nextEvaluationTime; + + // early stop the triggering thread, so it does not attempt to return any more data + stopTriggers(); + + // make sure we had no asynchronous error so far + checkErroneous(); + + // emit the remaining data + computeWindow(finalWindowTimestamp); + } + } + + @Override + public void dispose() { + // acquire the lock during shutdown, to prevent trigger calls at the same time + synchronized (lock) { + // fail-safe stop of the triggering thread (in case of an error) + stopTriggers(); + + // release the panes + panes.dispose(); + } + } + + private void stopTriggers() { + if (triggerTimer != null) { + triggerTimer.shutdown(); + } + + // reset the action timestamps. this makes sure any pending triggers will not evaluate + nextEvaluationTime = -1L; + nextSlideTime = -1L; + } + + // ------------------------------------------------------------------------ + // Receiving elements and triggers + // ------------------------------------------------------------------------ + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + synchronized (lock) { + checkErroneous(); + panes.addElementToLatestPane(element.getValue()); + } + } + + @Override + public void processWatermark(Watermark mark) { + // this operator does not react to watermarks + } + + @Override + public void trigger(long timestamp) { + synchronized (lock) { + // first we check if we actually trigger the window function + if (timestamp == nextEvaluationTime) { + // compute and output the results + try { + computeWindow(timestamp); + } + catch (Throwable t) { + this.asyncError = t; + } + + nextEvaluationTime += windowSlide; + } + + // check if we slide the panes by one. this may happen in addition to the + // window computation, or just by itself + if (timestamp == nextSlideTime) { + try { + panes.slidePanes(numPanesPerWindow); + } + catch (Throwable t) { + this.asyncError = t; + } + nextSlideTime += paneSize; + } + + long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); + triggerTimer.scheduleTriggerAt(this, nextTriggerTime); + } + } + + private void checkErroneous() throws Exception { + if (asyncError != null) { + throw new Exception("Error while computing and producing window result", asyncError); + } + } + + private void computeWindow(long timestamp) throws Exception { + out.setTimestamp(timestamp); + panes.truncatePanes(numPanesPerWindow); + panes.evaluateWindow(out); + } + + // ------------------------------------------------------------------------ + // Property access (for testing) + // ------------------------------------------------------------------------ + + public long getWindowSize() { + return windowSize; + } + + public long getWindowSlide() { + return windowSlide; + } + + public long getPaneSize() { + return paneSize; + } + + public int getNumPanesPerWindow() { + return numPanesPerWindow; + } + + public long getNextEvaluationTime() { + return nextEvaluationTime; + } + + public long getNextSlideTime() { + return nextSlideTime; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java new file mode 100644 index 0000000..a49b2e6 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.util.Collector; + +import java.util.ArrayDeque; + + +public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> { + + protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>(); + + protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>(); + + // ------------------------------------------------------------------------ + + public abstract void addElementToLatestPane(Type element) throws Exception; + + public abstract void evaluateWindow(Collector<Result> out) throws Exception; + + + public void dispose() { + // since all is heap data, there is no need to clean up anything + latestPane = null; + previousPanes.clear(); + } + + + public void slidePanes(int panesToKeep) { + if (panesToKeep > 1) { + // the current pane becomes the latest previous pane + previousPanes.addLast(latestPane); + + // truncate the history + while (previousPanes.size() >= panesToKeep) { + previousPanes.removeFirst(); + } + } + + // we need a new latest pane + latestPane = new KeyMap<>(); + } + + public void truncatePanes(int numToRetain) { + while (previousPanes.size() >= numToRetain) { + previousPanes.removeFirst(); + } + } + + protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{ + // gather all panes in an array (faster iterations) + @SuppressWarnings({"unchecked", "rawtypes"}) + KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]); + panes[panes.length - 1] = latestPane; + + // let the maps make a coordinated traversal and evaluate the window function per contained key + KeyMap.traverseMaps(panes, traversal, traversalPass); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java new file mode 100644 index 0000000..e776106 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.util.UnionIterator; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; + + +public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> { + + private final KeySelector<Type, Key> keySelector; + + private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); + + private final KeyedWindowFunction<Key, Type, Result> function; + + private long evaluationPass; + + // ------------------------------------------------------------------------ + + public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Key, Type, Result> function) { + this.keySelector = keySelector; + this.function = function; + } + + // ------------------------------------------------------------------------ + + @Override + public void addElementToLatestPane(Type element) throws Exception { + Key k = keySelector.getKey(element); + ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory); + elements.add(element); + } + + @Override + public void evaluateWindow(Collector<Result> out) throws Exception { + if (previousPanes.isEmpty()) { + // optimized path for single pane case (tumbling window) + for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) { + function.evaluate(entry.getKey(), entry.getValue(), out); + } + } + else { + // general code path for multi-pane case + WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, out); + traverseAllPanes(evaluator, evaluationPass); + } + + evaluationPass++; + } + + // ------------------------------------------------------------------------ + // Running a window function in a map traversal + // ------------------------------------------------------------------------ + + static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { + + private final KeyedWindowFunction<Key, Type, Result> function; + + private final UnionIterator<Type> unionIterator; + + private final Collector<Result> out; + + private Key currentKey; + + WindowFunctionTraversal(KeyedWindowFunction<Key, Type, Result> function, Collector<Result> out) { + this.function = function; + this.out = out; + this.unionIterator = new UnionIterator<>(); + } + + + @Override + public void startNewKey(Key key) { + unionIterator.clear(); + currentKey = key; + } + + @Override + public void nextValue(ArrayList<Type> value) { + unionIterator.addList(value); + } + + @Override + public void keyDone() throws Exception { + function.evaluate(currentKey, unionIterator, out); + } + } + + // ------------------------------------------------------------------------ + // Lazy factory for lists (put if absent) + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() { + return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY; + } + + private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() { + + @Override + public ArrayList<?> create() { + return new ArrayList<>(4); + } + }; +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java new file mode 100644 index 0000000..16444c2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; + + +public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> + extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> { + + private static final long serialVersionUID = 7305948082830843475L; + + + public AccumulatingProcessingTimeWindowOperator( + KeyedWindowFunction<KEY, IN, OUT> function, + KeySelector<IN, KEY> keySelector, + long windowLength, + long windowSlide) + { + super(function, keySelector, windowLength, windowSlide); + } + + @Override + protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { + @SuppressWarnings("unchecked") + KeyedWindowFunction<KEY, IN, OUT> windowFunction = (KeyedWindowFunction<KEY, IN, OUT>) function; + + return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java new file mode 100644 index 0000000..730c984 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.util.Collector; + + +public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> { + + private final KeySelector<Type, Key> keySelector; + + private final ReduceFunction<Type> reducer; + + private long evaluationPass; + + // ------------------------------------------------------------------------ + + public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) { + this.keySelector = keySelector; + this.reducer = reducer; + } + + // ------------------------------------------------------------------------ + + @Override + public void addElementToLatestPane(Type element) throws Exception { + Key k = keySelector.getKey(element); + latestPane.putOrAggregate(k, element, reducer); + } + + @Override + public void evaluateWindow(Collector<Type> out) throws Exception { + if (previousPanes.isEmpty()) { + // optimized path for single pane case + for (KeyMap.Entry<Key, Type> entry : latestPane) { + out.collect(entry.getValue()); + } + } + else { + // general code path for multi-pane case + AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out); + traverseAllPanes(evaluator, evaluationPass); + } + + evaluationPass++; + } + + // ------------------------------------------------------------------------ + // The maps traversal that performs the final aggregation + // ------------------------------------------------------------------------ + + static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> { + + private final ReduceFunction<Type> function; + + private final Collector<Type> out; + + private Type currentValue; + + AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) { + this.function = function; + this.out = out; + } + + @Override + public void startNewKey(Key key) { + currentValue = null; + } + + @Override + public void nextValue(Type value) throws Exception { + if (currentValue != null) { + currentValue = function.reduce(currentValue, value); + } + else { + currentValue = value; + } + } + + @Override + public void keyDone() throws Exception { + out.collect(currentValue); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java new file mode 100644 index 0000000..8bed749 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; + +public class AggregatingProcessingTimeWindowOperator<KEY, IN> + extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN> { + + private static final long serialVersionUID = 7305948082830843475L; + + + public AggregatingProcessingTimeWindowOperator( + ReduceFunction<IN> function, + KeySelector<IN, KEY> keySelector, + long windowLength, + long windowSlide) + { + super(function, keySelector, windowLength, windowSlide); + } + + @Override + protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) { + @SuppressWarnings("unchecked") + ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function; + + return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/KeyMap.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/KeyMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/KeyMap.java new file mode 100644 index 0000000..6e2d75e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/KeyMap.java @@ -0,0 +1,651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.runtime.util.MathUtils; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * A special Hash Map implementation that can be traversed efficiently in sync with other + * hash maps. + * <p> + * The differences between this hash map and Java's "java.util.HashMap" are: + * <ul> + * <li>A different hashing scheme. This implementation uses extensible hashing, meaning that + * each hash table growth takes one more lower hash code bit into account, and values that where + * formerly in the same bucket will afterwards be in the two adjacent buckets.</li> + * <li>This allows an efficient traversal of multiple hash maps together, even though the maps are + * of different sizes.</li> + * <li>The map offers functions such as "putIfAbsent()" and "putOrAggregate()"</li> + * <li>The map supports no removal/shrinking.</li> + * </ul> + */ +public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> { + + /** The minimum table capacity, 64 entries */ + private static final int MIN_CAPACITY = 0x40; + + /** The maximum possible table capacity, the largest positive power of + * two in the 32bit signed integer value range */ + private static final int MAX_CAPACITY = 0x40000000; + + /** The number of bits used for table addressing when table is at max capacity */ + private static final int FULL_BIT_RANGE = MathUtils.log2strict(MAX_CAPACITY); + + // ------------------------------------------------------------------------ + + /** The hash index, as an array of entries */ + private Entry<K, V>[] table; + + /** The number of bits by which the hash code is shifted right, to find the bucket */ + private int shift; + + /** The number of elements in the hash table */ + private int numElements; + + /** The number of elements above which the hash table needs to grow */ + private int rehashThreshold; + + /** The base-2 logarithm of the table capacity */ + private int log2size; + + // ------------------------------------------------------------------------ + + /** + * Creates a new hash table with the default initial capacity. + */ + public KeyMap() { + this(0); + } + + /** + * Creates a new table with a capacity tailored to the given expected number of elements. + * + * @param expectedNumberOfElements The number of elements to tailor the capacity to. + */ + public KeyMap(int expectedNumberOfElements) { + if (expectedNumberOfElements < 0) { + throw new IllegalArgumentException("Invalid capacity: " + expectedNumberOfElements); + } + + // round up to the next power or two + // guard against too small capacity and integer overflows + int capacity = Integer.highestOneBit(expectedNumberOfElements) << 1; + capacity = capacity >= 0 ? Math.max(MIN_CAPACITY, capacity) : MAX_CAPACITY; + + // this also acts as a sanity check + log2size = MathUtils.log2strict(capacity); + shift = FULL_BIT_RANGE - log2size; + table = allocateTable(capacity); + rehashThreshold = getRehashThreshold(capacity); + } + + // ------------------------------------------------------------------------ + // Gets and Puts + // ------------------------------------------------------------------------ + + /** + * Inserts the given value, mapped under the given key. If the table already contains a value for + * the key, the value is replaced and returned. If no value is contained, yet, the function + * returns null. + * + * @param key The key to insert. + * @param value The value to insert. + * @return The previously mapped value for the key, or null, if no value was mapped for the key. + * + * @throws java.lang.NullPointerException Thrown, if the key is null. + */ + public final V put(K key, V value) { + final int hash = hash(key); + final int slot = indexOf (hash); + + // search the chain from the slot + for (Entry<K, V> e = table[slot]; e != null; e = e.next) { + Object k; + if (e.hashCode == hash && ((k = e.key) == key || key.equals(k))) { + // found match + V old = e.value; + e.value = value; + return old; + } + } + + // no match, insert a new value + insertNewEntry(hash, key, value, slot); + return null; + } + + /** + * Inserts a value for the given key, if no value is yet contained for that key. Otherwise, + * returns the value currently contained for the key. + * <p> + * The value that is inserted in case that the key is not contained, yet, is lazily created + * using the given factory. + * + * @param key The key to insert. + * @param factory The factory that produces the value, if no value is contained, yet, for the key. + * @return The value in the map after this operation (either the previously contained value, or the + * newly created value). + * + * @throws java.lang.NullPointerException Thrown, if the key is null. + */ + public final V putIfAbsent(K key, LazyFactory<V> factory) { + final int hash = hash(key); + final int slot = indexOf(hash); + + // search the chain from the slot + for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) { + if (entry.hashCode == hash && entry.key.equals(key)) { + // found match + return entry.value; + } + } + + // no match, insert a new value + V value = factory.create(); + insertNewEntry(hash, key, value, slot); + + // return the created value + return value; + } + + /** + * Inserts or aggregates a value into the hash map. If the hash map does not yet contain the key, + * this method inserts the value. If the table already contains the key (and a value) this + * method will use the given ReduceFunction function to combine the existing value and the + * given value to a new value, and store that value for the key. + * + * @param key The key to map the value. + * @param value The new value to insert, or aggregate with the existing value. + * @param aggregator The aggregator to use if a value is already contained. + * + * @return The value in the map after this operation: Either the given value, or the aggregated value. + * + * @throws java.lang.NullPointerException Thrown, if the key is null. + * @throws Exception The method forwards exceptions from the aggregation function. + */ + public final V putOrAggregate(K key, V value, ReduceFunction<V> aggregator) throws Exception { + final int hash = hash(key); + final int slot = indexOf(hash); + + // search the chain from the slot + for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) { + if (entry.hashCode == hash && entry.key.equals(key)) { + // found match + entry.value = aggregator.reduce(entry.value, value); + return entry.value; + } + } + + // no match, insert a new value + insertNewEntry(hash, key, value, slot); + // return the original value + return value; + } + + /** + * Looks up the value mapped under the given key. Returns null if no value is mapped under this key. + * + * @param key The key to look up. + * @return The value associated with the key, or null, if no value is found for the key. + * + * @throws java.lang.NullPointerException Thrown, if the key is null. + */ + public V get(K key) { + final int hash = hash(key); + final int slot = indexOf(hash); + + // search the chain from the slot + for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) { + if (entry.hashCode == hash && entry.key.equals(key)) { + return entry.value; + } + } + + // not found + return null; + } + + private void insertNewEntry(int hashCode, K key, V value, int position) { + Entry<K,V> e = table[position]; + table[position] = new Entry<>(key, value, hashCode, e); + numElements++; + + // rehash if necessary + if (numElements > rehashThreshold) { + growTable(); + } + } + + private int indexOf(int hashCode) { + return (hashCode >> shift) & (table.length - 1); + } + + /** + * Creates an iterator over the entries of this map. + * + * @return An iterator over the entries of this map. + */ + @Override + public Iterator<Entry<K, V>> iterator() { + return new Iterator<Entry<K, V>>() { + + private final Entry<K, V>[] tab = KeyMap.this.table; + + private Entry<K, V> nextEntry; + + private int nextPos = 0; + + @Override + public boolean hasNext() { + if (nextEntry != null) { + return true; + } + else { + while (nextPos < tab.length) { + Entry<K, V> e = tab[nextPos++]; + if (e != null) { + nextEntry = e; + return true; + } + } + return false; + } + } + + @Override + public Entry<K, V> next() { + if (nextEntry != null || hasNext()) { + Entry<K, V> e = nextEntry; + nextEntry = nextEntry.next; + return e; + } + else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + /** + * Gets the number of elements currently in the map. + * @return The number of elements currently in the map. + */ + public int size() { + return numElements; + } + + /** + * Checks whether the map is empty. + * @return True, if the map is empty, false otherwise. + */ + public boolean isEmpty() { + return numElements == 0; + } + + /** + * Gets the current table capacity, i.e., the number of slots in the hash table, without + * and overflow chaining. + * @return The number of slots in the hash table. + */ + public int getCurrentTableCapacity() { + return table.length; + } + + /** + * Gets the base-2 logarithm of the hash table capacity, as returned by + * {@link #getCurrentTableCapacity()}. + * + * @return The base-2 logarithm of the hash table capacity. + */ + public int getLog2TableCapacity() { + return log2size; + } + + public int getRehashThreshold() { + return rehashThreshold; + } + + public int getShift() { + return shift; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + private Entry<K, V>[] allocateTable(int numElements) { + return (Entry<K, V>[]) new Entry<?, ?>[numElements]; + } + + private void growTable() { + final int newSize = table.length << 1; + + // only grow if there is still space to grow the table + if (newSize > 0) { + final Entry<K, V>[] oldTable = this.table; + final Entry<K, V>[] newTable = allocateTable(newSize); + + final int newShift = shift - 1; + final int newMask = newSize - 1; + + // go over all slots from the table. since we hash to adjacent positions in + // the new hash table, this is actually cache efficient + for (Entry<K, V> entry : oldTable) { + // traverse the chain for each slot + while (entry != null) { + final int newPos = (entry.hashCode >> newShift) & newMask; + Entry<K, V> nextEntry = entry.next; + entry.next = newTable[newPos]; + newTable[newPos] = entry; + entry = nextEntry; + } + } + + this.table = newTable; + this.shift = newShift; + this.rehashThreshold = getRehashThreshold(newSize); + this.log2size += 1; + } + } + + private static int hash(Object key) { + int code = key.hashCode(); + + // we need a strong hash function that generates diverse upper bits + // this hash function is more expensive than the "scramble" used by "java.util.HashMap", + // but required for this sort of hash table + code = (code + 0x7ed55d16) + (code << 12); + code = (code ^ 0xc761c23c) ^ (code >>> 19); + code = (code + 0x165667b1) + (code << 5); + code = (code + 0xd3a2646c) ^ (code << 9); + code = (code + 0xfd7046c5) + (code << 3); + return (code ^ 0xb55a4f09) ^ (code >>> 16); + } + + private static int getRehashThreshold(int capacity) { + // divide before multiply, to avoid overflow + return capacity / 4 * 3; + } + + // ------------------------------------------------------------------------ + // Testing Utilities + // ------------------------------------------------------------------------ + + /** + * For testing only: Actively counts the number of entries, rather than using the + * counter variable. This method has linear complexity, rather than constant. + * + * @return The counted number of entries. + */ + int traverseAndCountElements() { + int num = 0; + + for (Entry<?, ?> entry : table) { + while (entry != null) { + num++; + entry = entry.next; + } + } + + return num; + } + + /** + * For testing only: Gets the length of the longest overflow chain. + * This method has linear complexity. + * + * @return The length of the longest overflow chain. + */ + int getLongestChainLength() { + int maxLen = 0; + + for (Entry<?, ?> entry : table) { + int thisLen = 0; + while (entry != null) { + thisLen++; + entry = entry.next; + } + maxLen = Math.max(maxLen, thisLen); + } + + return maxLen; + } + + // ------------------------------------------------------------------------ + + /** + * An entry in the hash table. + * + * @param <K> Type of the key. + * @param <V> Type of the value. + */ + public static final class Entry<K, V> { + + final K key; + final int hashCode; + + V value; + Entry<K, V> next; + long touchedTag; + + Entry(K key, V value, int hashCode, Entry<K, V> next) { + this.key = key; + this.value = value; + this.next = next; + this.hashCode = hashCode; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + } + + // ------------------------------------------------------------------------ + + /** + * Performs a traversal about logical the multi-map that results from the union of the + * given maps. This method does not actually build a union of the map, but traverses the hash maps + * together. + * + * @param maps The array uf maps whose union should be traversed. + * @param visitor The visitor that is called for each key and all values. + * @param touchedTag A tag that is used to mark elements that have been touched in this specific + * traversal. Each successive traversal should supply a larger value for this + * tag than the previous one. + * + * @param <K> The type of the map's key. + * @param <V> The type of the map's value. + */ + public static <K, V> void traverseMaps( + final KeyMap<K, V>[] maps, + final TraversalEvaluator<K, V> visitor, + final long touchedTag) + throws Exception + { + // we need to work on the maps in descending size + Arrays.sort(maps, CapacityDescendingComparator.INSTANCE); + + final int[] shifts = new int[maps.length]; + final int[] lowBitsMask = new int[maps.length]; + final int numSlots = maps[0].table.length; + final int numTables = maps.length; + + // figure out how much each hash table collapses the entries + for (int i = 0; i < numTables; i++) { + shifts[i] = maps[0].log2size - maps[i].log2size; + lowBitsMask[i] = (1 << shifts[i]) - 1; + } + + // go over all slots (based on the largest hash table) + for (int pos = 0; pos < numSlots; pos++) { + + // for each slot, go over all tables, until the table does not have that slot any more + // for tables where multiple slots collapse into one, we visit that one when we process the + // latest of all slots that collapse to that one + int mask; + for (int rootTable = 0; + rootTable < numTables && ((mask = lowBitsMask[rootTable]) & pos) == mask; + rootTable++) + { + // use that table to gather keys and start collecting keys from the following tables + // go over all entries of that slot in the table + Entry<K, V> entry = maps[rootTable].table[pos >> shifts[rootTable]]; + while (entry != null) { + // take only entries that have not been collected as part of other tables + if (entry.touchedTag < touchedTag) { + entry.touchedTag = touchedTag; + + final K key = entry.key; + final int hashCode = entry.hashCode; + visitor.startNewKey(key); + visitor.nextValue(entry.value); + + addEntriesFromChain(entry.next, visitor, key, touchedTag, hashCode); + + // go over the other hash tables and collect their entries for the key + for (int followupTable = rootTable + 1; followupTable < numTables; followupTable++) { + Entry<K, V> followupEntry = maps[followupTable].table[pos >> shifts[followupTable]]; + if (followupEntry != null) { + addEntriesFromChain(followupEntry, visitor, key, touchedTag, hashCode); + } + } + + visitor.keyDone(); + } + + entry = entry.next; + } + } + } + } + + private static <K, V> void addEntriesFromChain( + Entry<K, V> entry, + TraversalEvaluator<K, V> visitor, + K key, + long touchedTag, + int hashCode) throws Exception + { + while (entry != null) { + if (entry.touchedTag < touchedTag && entry.hashCode == hashCode && entry.key.equals(key)) { + entry.touchedTag = touchedTag; + visitor.nextValue(entry.value); + } + entry = entry.next; + } + } + + // ------------------------------------------------------------------------ + + /** + * Comparator that defines a descending order on maps depending on their table capacity + * and number of elements. + */ + static final class CapacityDescendingComparator implements Comparator<KeyMap<?, ?>> { + + static final CapacityDescendingComparator INSTANCE = new CapacityDescendingComparator(); + + private CapacityDescendingComparator() {} + + + @Override + public int compare(KeyMap<?, ?> o1, KeyMap<?, ?> o2) { + // this sorts descending + int cmp = o2.getLog2TableCapacity() - o1.getLog2TableCapacity(); + if (cmp != 0) { + return cmp; + } + else { + return o2.size() - o1.size(); + } + } + } + + // ------------------------------------------------------------------------ + + /** + * A factory for lazy/on-demand instantiation of values. + * + * @param <V> The type created by the factory. + */ + public static interface LazyFactory<V> { + + /** + * The factory method; creates the value. + * @return The value. + */ + V create(); + } + + // ------------------------------------------------------------------------ + + /** + * A visitor for a traversal over the union of multiple hash maps. The visitor is + * called for each key in the union of the maps and all values associated with that key + * (one per map, but multiple across maps). + * + * @param <K> The type of the key. + * @param <V> The type of the value. + */ + public static interface TraversalEvaluator<K, V> { + + /** + * Called whenever the traversal starts with a new key. + * + * @param key The key traversed. + * @throws Exception Method forwards all exceptions. + */ + void startNewKey(K key) throws Exception; + + /** + * Called for each value found for the current key. + * + * @param value The next value. + * @throws Exception Method forwards all exceptions. + */ + void nextValue(V value) throws Exception; + + /** + * Called when the traversal for the current key is complete. + * + * @throws Exception Method forwards all exceptions. + */ + void keyDone() throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/package-info.java new file mode 100644 index 0000000..63ed470 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the operators that implement the various window operations + * on data streams. + */ +package org.apache.flink.streaming.runtime.operators.windows; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java index d93078b..5680810 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; /** - * An exception that is thrown by the stream verices when encountering an + * An exception that is thrown by the stream vertices when encountering an * illegal condition. */ public class StreamTaskException extends RuntimeException { http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java new file mode 100644 index 0000000..a40ae3a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes that realize streaming tasks. These tasks are + * executable stream consumers and producers that are scheduled by the distributed + * dataflow runtime. Each task occupies one execution slot and is run with by an + * executing thread. + * <p> + * The tasks merely set up the distributed stream coordination and the checkpointing. + * Internally, the tasks create one or more operators, perform the stream transformations. + */ +package org.apache.flink.streaming.runtime.tasks; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/b8f58fab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java new file mode 100644 index 0000000..685939b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windows; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.operators.TriggerTimer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import org.apache.flink.util.Collector; +import org.junit.After; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +@SuppressWarnings("serial") +public class AccumulatingAlignedProcessingTimeWindowOperatorTest { + + @SuppressWarnings("unchecked") + private final KeyedWindowFunction<String, String, String> mockFunction = mock(KeyedWindowFunction.class); + + @SuppressWarnings("unchecked") + private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class); + + private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() { + @Override + public Integer getKey(Integer value) { + return value; + } + }; + + private final KeyedWindowFunction<Integer, Integer, Integer> validatingIdentityFunction = + new KeyedWindowFunction<Integer, Integer, Integer>() + { + @Override + public void evaluate(Integer key, Iterable<Integer> values, Collector<Integer> out) { + for (Integer val : values) { + assertEquals(key, val); + out.collect(val); + } + } + }; + + // ------------------------------------------------------------------------ + + @After + public void checkNoTriggerThreadsRunning() { + // make sure that all the threads we trigger are shut down + long deadline = System.currentTimeMillis() + 5000; + while (TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) { + try { + Thread.sleep(10); + } + catch (InterruptedException ignored) {} + } + + assertTrue("Not all trigger threads where properly shut down", + TriggerTimer.TRIGGER_THREADS_GROUP.activeCount() == 0); + } + + // ------------------------------------------------------------------------ + + @Test + public void testInvalidParameters() { + try { + assertInvalidParameter(-1L, -1L); + assertInvalidParameter(10000L, -1L); + assertInvalidParameter(-1L, 1000L); + assertInvalidParameter(1000L, 2000L); + + // actual internal slide is too low here: + assertInvalidParameter(1000L, 999L); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testWindowSizeAndSlide() { + try { + AbstractAlignedProcessingTimeWindowOperator<String, String, String> op; + + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000); + assertEquals(5000, op.getWindowSize()); + assertEquals(1000, op.getWindowSlide()); + assertEquals(1000, op.getPaneSize()); + assertEquals(5, op.getNumPanesPerWindow()); + + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000); + assertEquals(1000, op.getWindowSize()); + assertEquals(1000, op.getWindowSlide()); + assertEquals(1000, op.getPaneSize()); + assertEquals(1, op.getNumPanesPerWindow()); + + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000); + assertEquals(1500, op.getWindowSize()); + assertEquals(1000, op.getWindowSlide()); + assertEquals(500, op.getPaneSize()); + assertEquals(3, op.getNumPanesPerWindow()); + + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100); + assertEquals(1200, op.getWindowSize()); + assertEquals(1100, op.getWindowSlide()); + assertEquals(100, op.getPaneSize()); + assertEquals(12, op.getNumPanesPerWindow()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testWindowTriggerTimeAlignment() { + try { + @SuppressWarnings("unchecked") + final Output<StreamRecord<String>> mockOut = mock(Output.class); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + AbstractAlignedProcessingTimeWindowOperator<String, String, String> op; + + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000); + op.setup(mockOut, mockContext); + op.open(new Configuration()); + assertTrue(op.getNextSlideTime() % 1000 == 0); + assertTrue(op.getNextEvaluationTime() % 1000 == 0); + op.dispose(); + + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000); + op.setup(mockOut, mockContext); + op.open(new Configuration()); + assertTrue(op.getNextSlideTime() % 1000 == 0); + assertTrue(op.getNextEvaluationTime() % 1000 == 0); + op.dispose(); + + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000); + op.setup(mockOut, mockContext); + op.open(new Configuration()); + assertTrue(op.getNextSlideTime() % 500 == 0); + assertTrue(op.getNextEvaluationTime() % 1000 == 0); + op.dispose(); + + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100); + op.setup(mockOut, mockContext); + op.open(new Configuration()); + assertTrue(op.getNextSlideTime() % 100 == 0); + assertTrue(op.getNextEvaluationTime() % 1100 == 0); + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testTumblingWindow() { + try { + final int windowSize = 50; + final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + // tumbling window that triggers every 20 milliseconds + AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityFunction, identitySelector, windowSize, windowSize); + + op.setup(out, mockContext); + op.open(new Configuration()); + + final int numElements = 1000; + + for (int i = 0; i < numElements; i++) { + op.processElement(new StreamRecord<Integer>(i)); + Thread.sleep(1); + } + + op.close(); + op.dispose(); + + // get and verify the result + List<Integer> result = out.getElements(); + assertEquals(numElements, result.size()); + + Collections.sort(result); + for (int i = 0; i < numElements; i++) { + assertEquals(i, result.get(i).intValue()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSlidingWindow() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(50); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + // tumbling window that triggers every 20 milliseconds + AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50); + + op.setup(out, mockContext); + op.open(new Configuration()); + + final int numElements = 1000; + + for (int i = 0; i < numElements; i++) { + op.processElement(new StreamRecord<Integer>(i)); + Thread.sleep(1); + } + + op.close(); + op.dispose(); + + // get and verify the result + List<Integer> result = out.getElements(); + + // if we kept this running, each element would be in the result three times (for each slide). + // we are closing the window before the final panes are through three times, so we may have less + // elements. + if (result.size() < numElements || result.size() > 3 * numElements) { + fail("Wrong number of results: " + result.size()); + } + + Collections.sort(result); + int lastNum = -1; + int lastCount = -1; + + for (int num : result) { + if (num == lastNum) { + lastCount++; + assertTrue(lastCount <= 3); + } + else { + lastNum = num; + lastCount = 1; + } + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testTumblingWindowSingleElements() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(50); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + // tumbling window that triggers every 20 milliseconds + AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50); + + op.setup(out, mockContext); + op.open(new Configuration()); + + op.processElement(new StreamRecord<Integer>(1)); + op.processElement(new StreamRecord<Integer>(2)); + out.waitForNElements(2, 60000); + + op.processElement(new StreamRecord<Integer>(3)); + op.processElement(new StreamRecord<Integer>(4)); + op.processElement(new StreamRecord<Integer>(5)); + out.waitForNElements(5, 60000); + + op.processElement(new StreamRecord<Integer>(6)); + out.waitForNElements(6, 60000); + + List<Integer> result = out.getElements(); + assertEquals(6, result.size()); + + Collections.sort(result); + assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result); + + op.close(); + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSlidingWindowSingleElements() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(50); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + // tumbling window that triggers every 20 milliseconds + AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50); + + op.setup(out, mockContext); + op.open(new Configuration()); + + op.processElement(new StreamRecord<Integer>(1)); + op.processElement(new StreamRecord<Integer>(2)); + + // each element should end up in the output three times + // wait until the elements have arrived 6 times in the output + out.waitForNElements(6, 120000); + + List<Integer> result = out.getElements(); + assertEquals(6, result.size()); + + Collections.sort(result); + assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result); + + op.close(); + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testEmitTrailingDataOnClose() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + // the operator has a window time that is so long that it will not fire in this test + final long oneYear = 365L * 24 * 60 * 60 * 1000; + AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, + oneYear, oneYear); + + op.setup(out, mockContext); + op.open(new Configuration()); + + List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + for (Integer i : data) { + op.processElement(new StreamRecord<Integer>(i)); + } + + op.close(); + op.dispose(); + + // get and verify the result + List<Integer> result = out.getElements(); + Collections.sort(result); + assertEquals(data, result); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPropagateExceptionsFromTrigger() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + KeyedWindowFunction<Integer, Integer, Integer> failingFunction = new FailingFunction(100); + + AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>(failingFunction, identitySelector, 50, 50); + + op.setup(out, mockContext); + op.open(new Configuration()); + + try { + int num = 0; + while (num < Integer.MAX_VALUE) { + op.processElement(new StreamRecord<Integer>(num++)); + Thread.sleep(1); + } + fail("This should really have failed with an exception quite a while ago..."); + } + catch (Exception e) { + assertNotNull(e.getCause()); + assertTrue(e.getCause().getMessage().contains("Artificial Test Exception")); + } + + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPropagateExceptionsFromClose() { + try { + final CollectingOutput<Integer> out = new CollectingOutput<>(); + + final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class); + when(mockContext.getTaskName()).thenReturn("Test task name"); + + KeyedWindowFunction<Integer, Integer, Integer> failingFunction = new FailingFunction(100); + + // the operator has a window time that is so long that it will not fire in this test + final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000; + AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + failingFunction, identitySelector, hundredYears, hundredYears); + + op.setup(out, mockContext); + op.open(new Configuration()); + + for (int i = 0; i < 150; i++) { + op.processElement(new StreamRecord<Integer>(i)); + } + + try { + op.close(); + fail("This should fail with an exception"); + } + catch (Exception e) { + assertTrue( + e.getMessage().contains("Artificial Test Exception") || + (e.getCause() != null && e.getCause().getMessage().contains("Artificial Test Exception"))); + } + + op.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + + private void assertInvalidParameter(long windowSize, long windowSlide) { + try { + new AccumulatingProcessingTimeWindowOperator<String, String, String>( + mockFunction, mockKeySelector, windowSize, windowSlide); + fail("This should fail with an IllegalArgumentException"); + } + catch (IllegalArgumentException e) { + // expected + } + catch (Exception e) { + fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName()); + } + } + + // ------------------------------------------------------------------------ + + private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer> { + + private final int failAfterElements; + + private int numElements; + + FailingFunction(int failAfterElements) { + this.failAfterElements = failAfterElements; + } + + @Override + public void evaluate(Integer integer, Iterable<Integer> values, Collector<Integer> out) throws Exception { + for (Integer i : values) { + out.collect(i); + numElements++; + + if (numElements >= failAfterElements) { + throw new Exception("Artificial Test Exception"); + } + } + } + } +}