Move window operators and tests to windowing package The api package is also called windowing, this harmonizes the package names.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05d2138f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05d2138f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05d2138f Branch: refs/heads/master Commit: 05d2138f081ff5fa274dab571b9327f96be693aa Parents: a606c4a Author: Aljoscha Krettek <[email protected]> Authored: Thu Sep 24 16:33:09 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Sep 28 17:04:16 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/KeyedWindowDataStream.java | 2 +- ...ractAlignedProcessingTimeWindowOperator.java | 223 +++++++ .../windowing/AbstractKeyedTimePanes.java | 76 +++ .../windowing/AccumulatingKeyedTimePanes.java | 126 ++++ ...ccumulatingProcessingTimeWindowOperator.java | 48 ++ .../windowing/AggregatingKeyedTimePanes.java | 103 +++ ...AggregatingProcessingTimeWindowOperator.java | 47 ++ .../runtime/operators/windowing/KeyMap.java | 651 +++++++++++++++++++ .../operators/windowing/PolicyToOperator.java | 82 +++ .../operators/windowing/package-info.java | 22 + ...ractAlignedProcessingTimeWindowOperator.java | 223 ------- .../windows/AbstractKeyedTimePanes.java | 76 --- .../windows/AccumulatingKeyedTimePanes.java | 126 ---- ...ccumulatingProcessingTimeWindowOperator.java | 48 -- .../windows/AggregatingKeyedTimePanes.java | 103 --- ...AggregatingProcessingTimeWindowOperator.java | 47 -- .../runtime/operators/windows/KeyMap.java | 651 ------------------- .../operators/windows/PolicyToOperator.java | 82 --- .../runtime/operators/windows/package-info.java | 22 - ...AlignedProcessingTimeWindowOperatorTest.java | 547 ++++++++++++++++ ...AlignedProcessingTimeWindowOperatorTest.java | 550 ++++++++++++++++ .../operators/windowing/CollectingOutput.java | 80 +++ .../windowing/KeyMapPutIfAbsentTest.java | 121 ++++ .../operators/windowing/KeyMapPutTest.java | 136 ++++ .../runtime/operators/windowing/KeyMapTest.java | 344 ++++++++++ ...AlignedProcessingTimeWindowOperatorTest.java | 547 ---------------- ...AlignedProcessingTimeWindowOperatorTest.java | 551 ---------------- .../operators/windows/CollectingOutput.java | 80 --- .../windows/KeyMapPutIfAbsentTest.java | 121 ---- .../operators/windows/KeyMapPutTest.java | 136 ---- .../runtime/operators/windows/KeyMapTest.java | 344 ---------- 31 files changed, 3157 insertions(+), 3158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java index dfb7032..37151d7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java @@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; -import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator; +import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator; /** * A KeyedWindowDataStream represents a data stream where elements are grouped by key, and http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java new file mode 100644 index 0000000..6c4e53a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.commons.math3.util.ArithmeticUtils; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.MathUtils; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + + +public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT>, Triggerable { + + private static final long serialVersionUID = 3245500864882459867L; + + private static final long MIN_SLIDE_TIME = 50; + + // ----- fields for operator parametrization ----- + + private final Function function; + private final KeySelector<IN, KEY> keySelector; + + private final long windowSize; + private final long windowSlide; + private final long paneSize; + private final int numPanesPerWindow; + + // ----- fields for operator functionality ----- + + private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes; + + private transient TimestampedCollector<OUT> out; + + private transient long nextEvaluationTime; + private transient long nextSlideTime; + + protected AbstractAlignedProcessingTimeWindowOperator( + Function function, + KeySelector<IN, KEY> keySelector, + long windowLength, + long windowSlide) + { + if (function == null || keySelector == null) { + throw new NullPointerException(); + } + if (windowLength < MIN_SLIDE_TIME) { + throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs"); + } + if (windowSlide < MIN_SLIDE_TIME) { + throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs"); + } + if (windowLength < windowSlide) { + throw new IllegalArgumentException("The window size must be larger than the window slide"); + } + + final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide); + if (paneSlide < MIN_SLIDE_TIME) { + throw new IllegalArgumentException(String.format( + "Cannot compute window of size %d msecs sliding by %d msecs. " + + "The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide)); + } + + this.function = function; + this.keySelector = keySelector; + this.windowSize = windowLength; + this.windowSlide = windowSlide; + this.paneSize = paneSlide; + this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide); + } + + + protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes( + KeySelector<IN, KEY> keySelector, Function function); + + // ------------------------------------------------------------------------ + // startup and shutdown + // ------------------------------------------------------------------------ + + @Override + public void open(Configuration parameters) throws Exception { + out = new TimestampedCollector<>(output); + + // create the panes that gather the elements per slide + panes = createPanes(keySelector, function); + + // decide when to first compute the window and when to slide it + // the values should align with the start of time (that is, the UNIX epoch, not the big bang) + final long now = System.currentTimeMillis(); + nextEvaluationTime = now + windowSlide - (now % windowSlide); + nextSlideTime = now + paneSize - (now % paneSize); + + getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this); + } + + @Override + public void close() throws Exception { + final long finalWindowTimestamp = nextEvaluationTime; + + // early stop the triggering thread, so it does not attempt to return any more data + stopTriggers(); + + // emit the remaining data + computeWindow(finalWindowTimestamp); + } + + @Override + public void dispose() { + // acquire the lock during shutdown, to prevent trigger calls at the same time + // fail-safe stop of the triggering thread (in case of an error) + stopTriggers(); + + // release the panes + panes.dispose(); + } + + private void stopTriggers() { + // reset the action timestamps. this makes sure any pending triggers will not evaluate + nextEvaluationTime = -1L; + nextSlideTime = -1L; + } + + // ------------------------------------------------------------------------ + // Receiving elements and triggers + // ------------------------------------------------------------------------ + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + panes.addElementToLatestPane(element.getValue()); + } + + @Override + public void processWatermark(Watermark mark) { + // this operator does not react to watermarks + } + + @Override + public void trigger(long timestamp) throws Exception { + // first we check if we actually trigger the window function + if (timestamp == nextEvaluationTime) { + // compute and output the results + computeWindow(timestamp); + + nextEvaluationTime += windowSlide; + } + + // check if we slide the panes by one. this may happen in addition to the + // window computation, or just by itself + if (timestamp == nextSlideTime) { + panes.slidePanes(numPanesPerWindow); + nextSlideTime += paneSize; + } + + long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); + getRuntimeContext().registerTimer(nextTriggerTime, this); + } + + private void computeWindow(long timestamp) throws Exception { + out.setTimestamp(timestamp); + panes.truncatePanes(numPanesPerWindow); + panes.evaluateWindow(out); + } + + // ------------------------------------------------------------------------ + // Property access (for testing) + // ------------------------------------------------------------------------ + + public long getWindowSize() { + return windowSize; + } + + public long getWindowSlide() { + return windowSlide; + } + + public long getPaneSize() { + return paneSize; + } + + public int getNumPanesPerWindow() { + return numPanesPerWindow; + } + + public long getNextEvaluationTime() { + return nextEvaluationTime; + } + + public long getNextSlideTime() { + return nextSlideTime; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java new file mode 100644 index 0000000..fae024b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.util.Collector; + +import java.util.ArrayDeque; + + +public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> { + + protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>(); + + protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>(); + + // ------------------------------------------------------------------------ + + public abstract void addElementToLatestPane(Type element) throws Exception; + + public abstract void evaluateWindow(Collector<Result> out) throws Exception; + + + public void dispose() { + // since all is heap data, there is no need to clean up anything + latestPane = null; + previousPanes.clear(); + } + + + public void slidePanes(int panesToKeep) { + if (panesToKeep > 1) { + // the current pane becomes the latest previous pane + previousPanes.addLast(latestPane); + + // truncate the history + while (previousPanes.size() >= panesToKeep) { + previousPanes.removeFirst(); + } + } + + // we need a new latest pane + latestPane = new KeyMap<>(); + } + + public void truncatePanes(int numToRetain) { + while (previousPanes.size() >= numToRetain) { + previousPanes.removeFirst(); + } + } + + protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{ + // gather all panes in an array (faster iterations) + @SuppressWarnings({"unchecked", "rawtypes"}) + KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]); + panes[panes.length - 1] = latestPane; + + // let the maps make a coordinated traversal and evaluate the window function per contained key + KeyMap.traverseMaps(panes, traversal, traversalPass); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java new file mode 100644 index 0000000..d85c53e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.util.UnionIterator; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; + + +public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> { + + private final KeySelector<Type, Key> keySelector; + + private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); + + private final KeyedWindowFunction<Type, Result, Key> function; + + private long evaluationPass; + + // ------------------------------------------------------------------------ + + public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) { + this.keySelector = keySelector; + this.function = function; + } + + // ------------------------------------------------------------------------ + + @Override + public void addElementToLatestPane(Type element) throws Exception { + Key k = keySelector.getKey(element); + ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory); + elements.add(element); + } + + @Override + public void evaluateWindow(Collector<Result> out) throws Exception { + if (previousPanes.isEmpty()) { + // optimized path for single pane case (tumbling window) + for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) { + function.evaluate(entry.getKey(), entry.getValue(), out); + } + } + else { + // general code path for multi-pane case + WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, out); + traverseAllPanes(evaluator, evaluationPass); + } + + evaluationPass++; + } + + // ------------------------------------------------------------------------ + // Running a window function in a map traversal + // ------------------------------------------------------------------------ + + static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { + + private final KeyedWindowFunction<Type, Result, Key> function; + + private final UnionIterator<Type> unionIterator; + + private final Collector<Result> out; + + private Key currentKey; + + WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) { + this.function = function; + this.out = out; + this.unionIterator = new UnionIterator<>(); + } + + + @Override + public void startNewKey(Key key) { + unionIterator.clear(); + currentKey = key; + } + + @Override + public void nextValue(ArrayList<Type> value) { + unionIterator.addList(value); + } + + @Override + public void keyDone() throws Exception { + function.evaluate(currentKey, unionIterator, out); + } + } + + // ------------------------------------------------------------------------ + // Lazy factory for lists (put if absent) + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() { + return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY; + } + + private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() { + + @Override + public ArrayList<?> create() { + return new ArrayList<>(4); + } + }; +} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java new file mode 100644 index 0000000..4df308d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; + + +public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> + extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> { + + private static final long serialVersionUID = 7305948082830843475L; + + + public AccumulatingProcessingTimeWindowOperator( + KeyedWindowFunction<IN, OUT, KEY> function, + KeySelector<IN, KEY> keySelector, + long windowLength, + long windowSlide) + { + super(function, keySelector, windowLength, windowSlide); + } + + @Override + protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { + @SuppressWarnings("unchecked") + KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function; + + return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java new file mode 100644 index 0000000..48f4eb1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.util.Collector; + + +public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> { + + private final KeySelector<Type, Key> keySelector; + + private final ReduceFunction<Type> reducer; + + private long evaluationPass; + + // ------------------------------------------------------------------------ + + public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) { + this.keySelector = keySelector; + this.reducer = reducer; + } + + // ------------------------------------------------------------------------ + + @Override + public void addElementToLatestPane(Type element) throws Exception { + Key k = keySelector.getKey(element); + latestPane.putOrAggregate(k, element, reducer); + } + + @Override + public void evaluateWindow(Collector<Type> out) throws Exception { + if (previousPanes.isEmpty()) { + // optimized path for single pane case + for (KeyMap.Entry<Key, Type> entry : latestPane) { + out.collect(entry.getValue()); + } + } + else { + // general code path for multi-pane case + AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out); + traverseAllPanes(evaluator, evaluationPass); + } + + evaluationPass++; + } + + // ------------------------------------------------------------------------ + // The maps traversal that performs the final aggregation + // ------------------------------------------------------------------------ + + static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> { + + private final ReduceFunction<Type> function; + + private final Collector<Type> out; + + private Type currentValue; + + AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) { + this.function = function; + this.out = out; + } + + @Override + public void startNewKey(Key key) { + currentValue = null; + } + + @Override + public void nextValue(Type value) throws Exception { + if (currentValue != null) { + currentValue = function.reduce(currentValue, value); + } + else { + currentValue = value; + } + } + + @Override + public void keyDone() throws Exception { + out.collect(currentValue); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java new file mode 100644 index 0000000..99457bb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; + +public class AggregatingProcessingTimeWindowOperator<KEY, IN> + extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN> { + + private static final long serialVersionUID = 7305948082830843475L; + + + public AggregatingProcessingTimeWindowOperator( + ReduceFunction<IN> function, + KeySelector<IN, KEY> keySelector, + long windowLength, + long windowSlide) + { + super(function, keySelector, windowLength, windowSlide); + } + + @Override + protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) { + @SuppressWarnings("unchecked") + ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function; + + return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java new file mode 100644 index 0000000..3f44c4a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java @@ -0,0 +1,651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.runtime.util.MathUtils; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * A special Hash Map implementation that can be traversed efficiently in sync with other + * hash maps. + * <p> + * The differences between this hash map and Java's "java.util.HashMap" are: + * <ul> + * <li>A different hashing scheme. This implementation uses extensible hashing, meaning that + * each hash table growth takes one more lower hash code bit into account, and values that where + * formerly in the same bucket will afterwards be in the two adjacent buckets.</li> + * <li>This allows an efficient traversal of multiple hash maps together, even though the maps are + * of different sizes.</li> + * <li>The map offers functions such as "putIfAbsent()" and "putOrAggregate()"</li> + * <li>The map supports no removal/shrinking.</li> + * </ul> + */ +public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> { + + /** The minimum table capacity, 64 entries */ + private static final int MIN_CAPACITY = 0x40; + + /** The maximum possible table capacity, the largest positive power of + * two in the 32bit signed integer value range */ + private static final int MAX_CAPACITY = 0x40000000; + + /** The number of bits used for table addressing when table is at max capacity */ + private static final int FULL_BIT_RANGE = MathUtils.log2strict(MAX_CAPACITY); + + // ------------------------------------------------------------------------ + + /** The hash index, as an array of entries */ + private Entry<K, V>[] table; + + /** The number of bits by which the hash code is shifted right, to find the bucket */ + private int shift; + + /** The number of elements in the hash table */ + private int numElements; + + /** The number of elements above which the hash table needs to grow */ + private int rehashThreshold; + + /** The base-2 logarithm of the table capacity */ + private int log2size; + + // ------------------------------------------------------------------------ + + /** + * Creates a new hash table with the default initial capacity. + */ + public KeyMap() { + this(0); + } + + /** + * Creates a new table with a capacity tailored to the given expected number of elements. + * + * @param expectedNumberOfElements The number of elements to tailor the capacity to. + */ + public KeyMap(int expectedNumberOfElements) { + if (expectedNumberOfElements < 0) { + throw new IllegalArgumentException("Invalid capacity: " + expectedNumberOfElements); + } + + // round up to the next power or two + // guard against too small capacity and integer overflows + int capacity = Integer.highestOneBit(expectedNumberOfElements) << 1; + capacity = capacity >= 0 ? Math.max(MIN_CAPACITY, capacity) : MAX_CAPACITY; + + // this also acts as a sanity check + log2size = MathUtils.log2strict(capacity); + shift = FULL_BIT_RANGE - log2size; + table = allocateTable(capacity); + rehashThreshold = getRehashThreshold(capacity); + } + + // ------------------------------------------------------------------------ + // Gets and Puts + // ------------------------------------------------------------------------ + + /** + * Inserts the given value, mapped under the given key. If the table already contains a value for + * the key, the value is replaced and returned. If no value is contained, yet, the function + * returns null. + * + * @param key The key to insert. + * @param value The value to insert. + * @return The previously mapped value for the key, or null, if no value was mapped for the key. + * + * @throws java.lang.NullPointerException Thrown, if the key is null. + */ + public final V put(K key, V value) { + final int hash = hash(key); + final int slot = indexOf (hash); + + // search the chain from the slot + for (Entry<K, V> e = table[slot]; e != null; e = e.next) { + Object k; + if (e.hashCode == hash && ((k = e.key) == key || key.equals(k))) { + // found match + V old = e.value; + e.value = value; + return old; + } + } + + // no match, insert a new value + insertNewEntry(hash, key, value, slot); + return null; + } + + /** + * Inserts a value for the given key, if no value is yet contained for that key. Otherwise, + * returns the value currently contained for the key. + * <p> + * The value that is inserted in case that the key is not contained, yet, is lazily created + * using the given factory. + * + * @param key The key to insert. + * @param factory The factory that produces the value, if no value is contained, yet, for the key. + * @return The value in the map after this operation (either the previously contained value, or the + * newly created value). + * + * @throws java.lang.NullPointerException Thrown, if the key is null. + */ + public final V putIfAbsent(K key, LazyFactory<V> factory) { + final int hash = hash(key); + final int slot = indexOf(hash); + + // search the chain from the slot + for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) { + if (entry.hashCode == hash && entry.key.equals(key)) { + // found match + return entry.value; + } + } + + // no match, insert a new value + V value = factory.create(); + insertNewEntry(hash, key, value, slot); + + // return the created value + return value; + } + + /** + * Inserts or aggregates a value into the hash map. If the hash map does not yet contain the key, + * this method inserts the value. If the table already contains the key (and a value) this + * method will use the given ReduceFunction function to combine the existing value and the + * given value to a new value, and store that value for the key. + * + * @param key The key to map the value. + * @param value The new value to insert, or aggregate with the existing value. + * @param aggregator The aggregator to use if a value is already contained. + * + * @return The value in the map after this operation: Either the given value, or the aggregated value. + * + * @throws java.lang.NullPointerException Thrown, if the key is null. + * @throws Exception The method forwards exceptions from the aggregation function. + */ + public final V putOrAggregate(K key, V value, ReduceFunction<V> aggregator) throws Exception { + final int hash = hash(key); + final int slot = indexOf(hash); + + // search the chain from the slot + for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) { + if (entry.hashCode == hash && entry.key.equals(key)) { + // found match + entry.value = aggregator.reduce(entry.value, value); + return entry.value; + } + } + + // no match, insert a new value + insertNewEntry(hash, key, value, slot); + // return the original value + return value; + } + + /** + * Looks up the value mapped under the given key. Returns null if no value is mapped under this key. + * + * @param key The key to look up. + * @return The value associated with the key, or null, if no value is found for the key. + * + * @throws java.lang.NullPointerException Thrown, if the key is null. + */ + public V get(K key) { + final int hash = hash(key); + final int slot = indexOf(hash); + + // search the chain from the slot + for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) { + if (entry.hashCode == hash && entry.key.equals(key)) { + return entry.value; + } + } + + // not found + return null; + } + + private void insertNewEntry(int hashCode, K key, V value, int position) { + Entry<K,V> e = table[position]; + table[position] = new Entry<>(key, value, hashCode, e); + numElements++; + + // rehash if necessary + if (numElements > rehashThreshold) { + growTable(); + } + } + + private int indexOf(int hashCode) { + return (hashCode >> shift) & (table.length - 1); + } + + /** + * Creates an iterator over the entries of this map. + * + * @return An iterator over the entries of this map. + */ + @Override + public Iterator<Entry<K, V>> iterator() { + return new Iterator<Entry<K, V>>() { + + private final Entry<K, V>[] tab = KeyMap.this.table; + + private Entry<K, V> nextEntry; + + private int nextPos = 0; + + @Override + public boolean hasNext() { + if (nextEntry != null) { + return true; + } + else { + while (nextPos < tab.length) { + Entry<K, V> e = tab[nextPos++]; + if (e != null) { + nextEntry = e; + return true; + } + } + return false; + } + } + + @Override + public Entry<K, V> next() { + if (nextEntry != null || hasNext()) { + Entry<K, V> e = nextEntry; + nextEntry = nextEntry.next; + return e; + } + else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + /** + * Gets the number of elements currently in the map. + * @return The number of elements currently in the map. + */ + public int size() { + return numElements; + } + + /** + * Checks whether the map is empty. + * @return True, if the map is empty, false otherwise. + */ + public boolean isEmpty() { + return numElements == 0; + } + + /** + * Gets the current table capacity, i.e., the number of slots in the hash table, without + * and overflow chaining. + * @return The number of slots in the hash table. + */ + public int getCurrentTableCapacity() { + return table.length; + } + + /** + * Gets the base-2 logarithm of the hash table capacity, as returned by + * {@link #getCurrentTableCapacity()}. + * + * @return The base-2 logarithm of the hash table capacity. + */ + public int getLog2TableCapacity() { + return log2size; + } + + public int getRehashThreshold() { + return rehashThreshold; + } + + public int getShift() { + return shift; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + private Entry<K, V>[] allocateTable(int numElements) { + return (Entry<K, V>[]) new Entry<?, ?>[numElements]; + } + + private void growTable() { + final int newSize = table.length << 1; + + // only grow if there is still space to grow the table + if (newSize > 0) { + final Entry<K, V>[] oldTable = this.table; + final Entry<K, V>[] newTable = allocateTable(newSize); + + final int newShift = shift - 1; + final int newMask = newSize - 1; + + // go over all slots from the table. since we hash to adjacent positions in + // the new hash table, this is actually cache efficient + for (Entry<K, V> entry : oldTable) { + // traverse the chain for each slot + while (entry != null) { + final int newPos = (entry.hashCode >> newShift) & newMask; + Entry<K, V> nextEntry = entry.next; + entry.next = newTable[newPos]; + newTable[newPos] = entry; + entry = nextEntry; + } + } + + this.table = newTable; + this.shift = newShift; + this.rehashThreshold = getRehashThreshold(newSize); + this.log2size += 1; + } + } + + private static int hash(Object key) { + int code = key.hashCode(); + + // we need a strong hash function that generates diverse upper bits + // this hash function is more expensive than the "scramble" used by "java.util.HashMap", + // but required for this sort of hash table + code = (code + 0x7ed55d16) + (code << 12); + code = (code ^ 0xc761c23c) ^ (code >>> 19); + code = (code + 0x165667b1) + (code << 5); + code = (code + 0xd3a2646c) ^ (code << 9); + code = (code + 0xfd7046c5) + (code << 3); + return (code ^ 0xb55a4f09) ^ (code >>> 16); + } + + private static int getRehashThreshold(int capacity) { + // divide before multiply, to avoid overflow + return capacity / 4 * 3; + } + + // ------------------------------------------------------------------------ + // Testing Utilities + // ------------------------------------------------------------------------ + + /** + * For testing only: Actively counts the number of entries, rather than using the + * counter variable. This method has linear complexity, rather than constant. + * + * @return The counted number of entries. + */ + int traverseAndCountElements() { + int num = 0; + + for (Entry<?, ?> entry : table) { + while (entry != null) { + num++; + entry = entry.next; + } + } + + return num; + } + + /** + * For testing only: Gets the length of the longest overflow chain. + * This method has linear complexity. + * + * @return The length of the longest overflow chain. + */ + int getLongestChainLength() { + int maxLen = 0; + + for (Entry<?, ?> entry : table) { + int thisLen = 0; + while (entry != null) { + thisLen++; + entry = entry.next; + } + maxLen = Math.max(maxLen, thisLen); + } + + return maxLen; + } + + // ------------------------------------------------------------------------ + + /** + * An entry in the hash table. + * + * @param <K> Type of the key. + * @param <V> Type of the value. + */ + public static final class Entry<K, V> { + + final K key; + final int hashCode; + + V value; + Entry<K, V> next; + long touchedTag; + + Entry(K key, V value, int hashCode, Entry<K, V> next) { + this.key = key; + this.value = value; + this.next = next; + this.hashCode = hashCode; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + } + + // ------------------------------------------------------------------------ + + /** + * Performs a traversal about logical the multi-map that results from the union of the + * given maps. This method does not actually build a union of the map, but traverses the hash maps + * together. + * + * @param maps The array uf maps whose union should be traversed. + * @param visitor The visitor that is called for each key and all values. + * @param touchedTag A tag that is used to mark elements that have been touched in this specific + * traversal. Each successive traversal should supply a larger value for this + * tag than the previous one. + * + * @param <K> The type of the map's key. + * @param <V> The type of the map's value. + */ + public static <K, V> void traverseMaps( + final KeyMap<K, V>[] maps, + final TraversalEvaluator<K, V> visitor, + final long touchedTag) + throws Exception + { + // we need to work on the maps in descending size + Arrays.sort(maps, CapacityDescendingComparator.INSTANCE); + + final int[] shifts = new int[maps.length]; + final int[] lowBitsMask = new int[maps.length]; + final int numSlots = maps[0].table.length; + final int numTables = maps.length; + + // figure out how much each hash table collapses the entries + for (int i = 0; i < numTables; i++) { + shifts[i] = maps[0].log2size - maps[i].log2size; + lowBitsMask[i] = (1 << shifts[i]) - 1; + } + + // go over all slots (based on the largest hash table) + for (int pos = 0; pos < numSlots; pos++) { + + // for each slot, go over all tables, until the table does not have that slot any more + // for tables where multiple slots collapse into one, we visit that one when we process the + // latest of all slots that collapse to that one + int mask; + for (int rootTable = 0; + rootTable < numTables && ((mask = lowBitsMask[rootTable]) & pos) == mask; + rootTable++) + { + // use that table to gather keys and start collecting keys from the following tables + // go over all entries of that slot in the table + Entry<K, V> entry = maps[rootTable].table[pos >> shifts[rootTable]]; + while (entry != null) { + // take only entries that have not been collected as part of other tables + if (entry.touchedTag < touchedTag) { + entry.touchedTag = touchedTag; + + final K key = entry.key; + final int hashCode = entry.hashCode; + visitor.startNewKey(key); + visitor.nextValue(entry.value); + + addEntriesFromChain(entry.next, visitor, key, touchedTag, hashCode); + + // go over the other hash tables and collect their entries for the key + for (int followupTable = rootTable + 1; followupTable < numTables; followupTable++) { + Entry<K, V> followupEntry = maps[followupTable].table[pos >> shifts[followupTable]]; + if (followupEntry != null) { + addEntriesFromChain(followupEntry, visitor, key, touchedTag, hashCode); + } + } + + visitor.keyDone(); + } + + entry = entry.next; + } + } + } + } + + private static <K, V> void addEntriesFromChain( + Entry<K, V> entry, + TraversalEvaluator<K, V> visitor, + K key, + long touchedTag, + int hashCode) throws Exception + { + while (entry != null) { + if (entry.touchedTag < touchedTag && entry.hashCode == hashCode && entry.key.equals(key)) { + entry.touchedTag = touchedTag; + visitor.nextValue(entry.value); + } + entry = entry.next; + } + } + + // ------------------------------------------------------------------------ + + /** + * Comparator that defines a descending order on maps depending on their table capacity + * and number of elements. + */ + static final class CapacityDescendingComparator implements Comparator<KeyMap<?, ?>> { + + static final CapacityDescendingComparator INSTANCE = new CapacityDescendingComparator(); + + private CapacityDescendingComparator() {} + + + @Override + public int compare(KeyMap<?, ?> o1, KeyMap<?, ?> o2) { + // this sorts descending + int cmp = o2.getLog2TableCapacity() - o1.getLog2TableCapacity(); + if (cmp != 0) { + return cmp; + } + else { + return o2.size() - o1.size(); + } + } + } + + // ------------------------------------------------------------------------ + + /** + * A factory for lazy/on-demand instantiation of values. + * + * @param <V> The type created by the factory. + */ + public static interface LazyFactory<V> { + + /** + * The factory method; creates the value. + * @return The value. + */ + V create(); + } + + // ------------------------------------------------------------------------ + + /** + * A visitor for a traversal over the union of multiple hash maps. The visitor is + * called for each key in the union of the maps and all values associated with that key + * (one per map, but multiple across maps). + * + * @param <K> The type of the key. + * @param <V> The type of the value. + */ + public static interface TraversalEvaluator<K, V> { + + /** + * Called whenever the traversal starts with a new key. + * + * @param key The key traversed. + * @throws Exception Method forwards all exceptions. + */ + void startNewKey(K key) throws Exception; + + /** + * Called for each value found for the current key. + * + * @param value The next value. + * @throws Exception Method forwards all exceptions. + */ + void nextValue(V value) throws Exception; + + /** + * Called when the traversal for the current key is complete. + * + * @throws Exception Method forwards all exceptions. + */ + void keyDone() throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java new file mode 100644 index 0000000..b34d0bc --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime; +import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime; +import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; + +/** + * This class implements the conversion from window policies to concrete operator + * implementations. + */ +public class PolicyToOperator { + + /** + * Entry point to create an operator for the given window policies and the window function. + */ + public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies( + WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector) + { + if (window == null || function == null) { + throw new NullPointerException(); + } + + // -- case 1: both policies are processing time policies + if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) { + final long windowLength = ((ProcessingTime) window).toMilliseconds(); + final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds(); + + if (function instanceof ReduceFunction) { + @SuppressWarnings("unchecked") + ReduceFunction<IN> reducer = (ReduceFunction<IN>) function; + + @SuppressWarnings("unchecked") + OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>) + new AggregatingProcessingTimeWindowOperator<KEY, IN>( + reducer, keySelector, windowLength, windowSlide); + return op; + } + else if (function instanceof KeyedWindowFunction) { + @SuppressWarnings("unchecked") + KeyedWindowFunction<IN, OUT, KEY> wf = (KeyedWindowFunction<IN, OUT, KEY>) function; + + return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>( + wf, keySelector, windowLength, windowSlide); + } + } + + // -- case 2: both policies are event time policies + if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) { + // add event time implementation + } + + throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide)); + } + + // ------------------------------------------------------------------------ + + /** Don't instantiate */ + private PolicyToOperator() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java new file mode 100644 index 0000000..55749a1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the operators that implement the various window operations + * on data streams. + */ +package org.apache.flink.streaming.runtime.operators.windowing; http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java deleted file mode 100644 index 2e926bc..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windows; - -import org.apache.commons.math3.util.ArithmeticUtils; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.util.MathUtils; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - - -public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<IN, OUT>, Triggerable { - - private static final long serialVersionUID = 3245500864882459867L; - - private static final long MIN_SLIDE_TIME = 50; - - // ----- fields for operator parametrization ----- - - private final Function function; - private final KeySelector<IN, KEY> keySelector; - - private final long windowSize; - private final long windowSlide; - private final long paneSize; - private final int numPanesPerWindow; - - // ----- fields for operator functionality ----- - - private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes; - - private transient TimestampedCollector<OUT> out; - - private transient long nextEvaluationTime; - private transient long nextSlideTime; - - protected AbstractAlignedProcessingTimeWindowOperator( - Function function, - KeySelector<IN, KEY> keySelector, - long windowLength, - long windowSlide) - { - if (function == null || keySelector == null) { - throw new NullPointerException(); - } - if (windowLength < MIN_SLIDE_TIME) { - throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs"); - } - if (windowSlide < MIN_SLIDE_TIME) { - throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs"); - } - if (windowLength < windowSlide) { - throw new IllegalArgumentException("The window size must be larger than the window slide"); - } - - final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide); - if (paneSlide < MIN_SLIDE_TIME) { - throw new IllegalArgumentException(String.format( - "Cannot compute window of size %d msecs sliding by %d msecs. " + - "The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide)); - } - - this.function = function; - this.keySelector = keySelector; - this.windowSize = windowLength; - this.windowSlide = windowSlide; - this.paneSize = paneSlide; - this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide); - } - - - protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes( - KeySelector<IN, KEY> keySelector, Function function); - - // ------------------------------------------------------------------------ - // startup and shutdown - // ------------------------------------------------------------------------ - - @Override - public void open(Configuration parameters) throws Exception { - out = new TimestampedCollector<>(output); - - // create the panes that gather the elements per slide - panes = createPanes(keySelector, function); - - // decide when to first compute the window and when to slide it - // the values should align with the start of time (that is, the UNIX epoch, not the big bang) - final long now = System.currentTimeMillis(); - nextEvaluationTime = now + windowSlide - (now % windowSlide); - nextSlideTime = now + paneSize - (now % paneSize); - - getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this); - } - - @Override - public void close() throws Exception { - final long finalWindowTimestamp = nextEvaluationTime; - - // early stop the triggering thread, so it does not attempt to return any more data - stopTriggers(); - - // emit the remaining data - computeWindow(finalWindowTimestamp); - } - - @Override - public void dispose() { - // acquire the lock during shutdown, to prevent trigger calls at the same time - // fail-safe stop of the triggering thread (in case of an error) - stopTriggers(); - - // release the panes - panes.dispose(); - } - - private void stopTriggers() { - // reset the action timestamps. this makes sure any pending triggers will not evaluate - nextEvaluationTime = -1L; - nextSlideTime = -1L; - } - - // ------------------------------------------------------------------------ - // Receiving elements and triggers - // ------------------------------------------------------------------------ - - @Override - public void processElement(StreamRecord<IN> element) throws Exception { - panes.addElementToLatestPane(element.getValue()); - } - - @Override - public void processWatermark(Watermark mark) { - // this operator does not react to watermarks - } - - @Override - public void trigger(long timestamp) throws Exception { - // first we check if we actually trigger the window function - if (timestamp == nextEvaluationTime) { - // compute and output the results - computeWindow(timestamp); - - nextEvaluationTime += windowSlide; - } - - // check if we slide the panes by one. this may happen in addition to the - // window computation, or just by itself - if (timestamp == nextSlideTime) { - panes.slidePanes(numPanesPerWindow); - nextSlideTime += paneSize; - } - - long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); - getRuntimeContext().registerTimer(nextTriggerTime, this); - } - - private void computeWindow(long timestamp) throws Exception { - out.setTimestamp(timestamp); - panes.truncatePanes(numPanesPerWindow); - panes.evaluateWindow(out); - } - - // ------------------------------------------------------------------------ - // Property access (for testing) - // ------------------------------------------------------------------------ - - public long getWindowSize() { - return windowSize; - } - - public long getWindowSlide() { - return windowSlide; - } - - public long getPaneSize() { - return paneSize; - } - - public int getNumPanesPerWindow() { - return numPanesPerWindow; - } - - public long getNextEvaluationTime() { - return nextEvaluationTime; - } - - public long getNextSlideTime() { - return nextSlideTime; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java deleted file mode 100644 index a49b2e6..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windows; - -import org.apache.flink.util.Collector; - -import java.util.ArrayDeque; - - -public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> { - - protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>(); - - protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>(); - - // ------------------------------------------------------------------------ - - public abstract void addElementToLatestPane(Type element) throws Exception; - - public abstract void evaluateWindow(Collector<Result> out) throws Exception; - - - public void dispose() { - // since all is heap data, there is no need to clean up anything - latestPane = null; - previousPanes.clear(); - } - - - public void slidePanes(int panesToKeep) { - if (panesToKeep > 1) { - // the current pane becomes the latest previous pane - previousPanes.addLast(latestPane); - - // truncate the history - while (previousPanes.size() >= panesToKeep) { - previousPanes.removeFirst(); - } - } - - // we need a new latest pane - latestPane = new KeyMap<>(); - } - - public void truncatePanes(int numToRetain) { - while (previousPanes.size() >= numToRetain) { - previousPanes.removeFirst(); - } - } - - protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{ - // gather all panes in an array (faster iterations) - @SuppressWarnings({"unchecked", "rawtypes"}) - KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]); - panes[panes.length - 1] = latestPane; - - // let the maps make a coordinated traversal and evaluate the window function per contained key - KeyMap.traverseMaps(panes, traversal, traversalPass); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java deleted file mode 100644 index 1212123..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windows; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.runtime.util.UnionIterator; -import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; - - -public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> { - - private final KeySelector<Type, Key> keySelector; - - private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); - - private final KeyedWindowFunction<Type, Result, Key> function; - - private long evaluationPass; - - // ------------------------------------------------------------------------ - - public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) { - this.keySelector = keySelector; - this.function = function; - } - - // ------------------------------------------------------------------------ - - @Override - public void addElementToLatestPane(Type element) throws Exception { - Key k = keySelector.getKey(element); - ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory); - elements.add(element); - } - - @Override - public void evaluateWindow(Collector<Result> out) throws Exception { - if (previousPanes.isEmpty()) { - // optimized path for single pane case (tumbling window) - for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) { - function.evaluate(entry.getKey(), entry.getValue(), out); - } - } - else { - // general code path for multi-pane case - WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, out); - traverseAllPanes(evaluator, evaluationPass); - } - - evaluationPass++; - } - - // ------------------------------------------------------------------------ - // Running a window function in a map traversal - // ------------------------------------------------------------------------ - - static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { - - private final KeyedWindowFunction<Type, Result, Key> function; - - private final UnionIterator<Type> unionIterator; - - private final Collector<Result> out; - - private Key currentKey; - - WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) { - this.function = function; - this.out = out; - this.unionIterator = new UnionIterator<>(); - } - - - @Override - public void startNewKey(Key key) { - unionIterator.clear(); - currentKey = key; - } - - @Override - public void nextValue(ArrayList<Type> value) { - unionIterator.addList(value); - } - - @Override - public void keyDone() throws Exception { - function.evaluate(currentKey, unionIterator, out); - } - } - - // ------------------------------------------------------------------------ - // Lazy factory for lists (put if absent) - // ------------------------------------------------------------------------ - - @SuppressWarnings("unchecked") - private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() { - return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY; - } - - private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() { - - @Override - public ArrayList<?> create() { - return new ArrayList<>(4); - } - }; -} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java deleted file mode 100644 index fb9d163..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windows; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction; - - -public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> - extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> { - - private static final long serialVersionUID = 7305948082830843475L; - - - public AccumulatingProcessingTimeWindowOperator( - KeyedWindowFunction<IN, OUT, KEY> function, - KeySelector<IN, KEY> keySelector, - long windowLength, - long windowSlide) - { - super(function, keySelector, windowLength, windowSlide); - } - - @Override - protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) { - @SuppressWarnings("unchecked") - KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function; - - return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java deleted file mode 100644 index 730c984..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windows; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.util.Collector; - - -public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> { - - private final KeySelector<Type, Key> keySelector; - - private final ReduceFunction<Type> reducer; - - private long evaluationPass; - - // ------------------------------------------------------------------------ - - public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) { - this.keySelector = keySelector; - this.reducer = reducer; - } - - // ------------------------------------------------------------------------ - - @Override - public void addElementToLatestPane(Type element) throws Exception { - Key k = keySelector.getKey(element); - latestPane.putOrAggregate(k, element, reducer); - } - - @Override - public void evaluateWindow(Collector<Type> out) throws Exception { - if (previousPanes.isEmpty()) { - // optimized path for single pane case - for (KeyMap.Entry<Key, Type> entry : latestPane) { - out.collect(entry.getValue()); - } - } - else { - // general code path for multi-pane case - AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out); - traverseAllPanes(evaluator, evaluationPass); - } - - evaluationPass++; - } - - // ------------------------------------------------------------------------ - // The maps traversal that performs the final aggregation - // ------------------------------------------------------------------------ - - static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> { - - private final ReduceFunction<Type> function; - - private final Collector<Type> out; - - private Type currentValue; - - AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) { - this.function = function; - this.out = out; - } - - @Override - public void startNewKey(Key key) { - currentValue = null; - } - - @Override - public void nextValue(Type value) throws Exception { - if (currentValue != null) { - currentValue = function.reduce(currentValue, value); - } - else { - currentValue = value; - } - } - - @Override - public void keyDone() throws Exception { - out.collect(currentValue); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java deleted file mode 100644 index 8bed749..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windows; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.functions.KeySelector; - -public class AggregatingProcessingTimeWindowOperator<KEY, IN> - extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN> { - - private static final long serialVersionUID = 7305948082830843475L; - - - public AggregatingProcessingTimeWindowOperator( - ReduceFunction<IN> function, - KeySelector<IN, KEY> keySelector, - long windowLength, - long windowSlide) - { - super(function, keySelector, windowLength, windowSlide); - } - - @Override - protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) { - @SuppressWarnings("unchecked") - ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function; - - return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction); - } -}
