Repository: flink Updated Branches: refs/heads/master d8ed58b6a -> 1ebd44a63
[hotfix] Remove leftover KeyedTimePanes Recently, the aligned window operators were removes, these classes where leftover after that removal. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68a99d7a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68a99d7a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68a99d7a Branch: refs/heads/master Commit: 68a99d7ab7ff10c3c0b6cd19babedbfdbfc31354 Parents: d8ed58b Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Sep 25 10:59:39 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Sep 25 10:59:39 2017 +0200 ---------------------------------------------------------------------- .../windowing/AbstractKeyedTimePanes.java | 156 ------------- .../windowing/AccumulatingKeyedTimePanes.java | 224 ------------------- .../windowing/AggregatingKeyedTimePanes.java | 119 ---------- 3 files changed, 499 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java deleted file mode 100644 index f815107..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Iterator; - -/** - * Base class for a multiple key/value maps organized in panes. - */ -@Internal -public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> { - - private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42; - - private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5; - - /** The latest time pane. */ - protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>(); - - /** The previous time panes, ordered by time (early to late). */ - protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>(); - - // ------------------------------------------------------------------------ - - public abstract void addElementToLatestPane(Type element) throws Exception; - - public abstract void evaluateWindow(Collector<Result> out, TimeWindow window, AbstractStreamOperator<Result> operator) throws Exception; - - public void dispose() { - // since all is heap data, there is no need to clean up anything - latestPane = null; - previousPanes.clear(); - } - - public int getNumPanes() { - return previousPanes.size() + 1; - } - - public void slidePanes(int panesToKeep) { - if (panesToKeep > 1) { - // the current pane becomes the latest previous pane - previousPanes.addLast(latestPane); - - // truncate the history - while (previousPanes.size() >= panesToKeep) { - previousPanes.removeFirst(); - } - } - - // we need a new latest pane - latestPane = new KeyMap<>(); - } - - public void truncatePanes(int numToRetain) { - while (previousPanes.size() >= numToRetain) { - previousPanes.removeFirst(); - } - } - - protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{ - // gather all panes in an array (faster iterations) - @SuppressWarnings({"unchecked", "rawtypes"}) - KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]); - panes[panes.length - 1] = latestPane; - - // let the maps make a coordinated traversal and evaluate the window function per contained key - KeyMap.traverseMaps(panes, traversal, traversalPass); - } - - // ------------------------------------------------------------------------ - // Serialization and de-serialization - // ------------------------------------------------------------------------ - - public void writeToOutput( - final DataOutputView output, - final TypeSerializer<Key> keySerializer, - final TypeSerializer<Aggregate> aggSerializer) throws IOException { - output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER); - - int numPanes = getNumPanes(); - output.writeInt(numPanes); - - // write from the past - Iterator<KeyMap<Key, Aggregate>> previous = previousPanes.iterator(); - for (int paneNum = 0; paneNum < numPanes; paneNum++) { - output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER); - KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) ? latestPane : previous.next(); - - output.writeInt(pane.size()); - for (KeyMap.Entry<Key, Aggregate> entry : pane) { - keySerializer.serialize(entry.getKey(), output); - aggSerializer.serialize(entry.getValue(), output); - } - } - } - - public void readFromInput( - final DataInputView input, - final TypeSerializer<Key> keySerializer, - final TypeSerializer<Aggregate> aggSerializer) throws IOException { - validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt()); - int numPanes = input.readInt(); - - // read from the past towards the presence - while (numPanes > 0) { - validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, input.readInt()); - KeyMap<Key, Aggregate> pane = (numPanes == 1) ? latestPane : new KeyMap<Key, Aggregate>(); - - final int numElementsInPane = input.readInt(); - for (int i = numElementsInPane - 1; i >= 0; i--) { - Key k = keySerializer.deserialize(input); - Aggregate a = aggSerializer.deserialize(input); - pane.put(k, a); - } - - if (numPanes > 1) { - previousPanes.addLast(pane); - } - numPanes--; - } - } - - private static void validateMagicNumber(int expected, int found) throws IOException { - if (expected != found) { - throw new IOException("Corrupt state stream - wrong magic number. " + - "Expected '" + Integer.toHexString(expected) + - "', found '" + Integer.toHexString(found) + '\''); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java deleted file mode 100644 index 6892aaa..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.KeyedStateStore; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.UnionIterator; - -import java.util.ArrayList; - -/** - * Key/value map organized in panes for accumulating windows (with a window function). - */ -@Internal -public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> { - - private final KeySelector<Type, Key> keySelector; - - private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory(); - - private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function; - - private final AccumulatingKeyedTimePanesContext context; - - /** - * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries - * have (zero). - */ - private long evaluationPass = 1L; - - // ------------------------------------------------------------------------ - - public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) { - this.keySelector = keySelector; - this.function = function; - this.context = new AccumulatingKeyedTimePanesContext(); - } - - // ------------------------------------------------------------------------ - - @Override - public void addElementToLatestPane(Type element) throws Exception { - Key k = keySelector.getKey(element); - ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory); - elements.add(element); - } - - @Override - public void evaluateWindow(Collector<Result> out, final TimeWindow window, - AbstractStreamOperator<Result> operator) throws Exception { - if (previousPanes.isEmpty()) { - // optimized path for single pane case (tumbling window) - for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) { - Key key = entry.getKey(); - operator.setCurrentKey(key); - context.globalState = operator.getKeyedStateStore(); - - function.process(entry.getKey(), window, context, entry.getValue(), out); - } - } - else { - // general code path for multi-pane case - WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>( - function, window, out, operator, context); - traverseAllPanes(evaluator, evaluationPass); - } - - evaluationPass++; - } - - // ------------------------------------------------------------------------ - // Running a window function in a map traversal - // ------------------------------------------------------------------------ - - static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> { - - private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function; - - private final UnionIterator<Type> unionIterator; - - private final Collector<Result> out; - - private final TimeWindow window; - - private final AbstractStreamOperator<Result> contextOperator; - - private Key currentKey; - - private AccumulatingKeyedTimePanesContext context; - - WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window, - Collector<Result> out, AbstractStreamOperator<Result> contextOperator, AccumulatingKeyedTimePanesContext context) { - this.function = function; - this.out = out; - this.unionIterator = new UnionIterator<>(); - this.window = window; - this.contextOperator = contextOperator; - this.context = context; - } - - @Override - public void startNewKey(Key key) { - unionIterator.clear(); - currentKey = key; - } - - @Override - public void nextValue(ArrayList<Type> value) { - unionIterator.addList(value); - } - - @Override - public void keyDone() throws Exception { - contextOperator.setCurrentKey(currentKey); - context.globalState = contextOperator.getKeyedStateStore(); - function.process(currentKey, window, context, unionIterator, out); - } - } - - // ------------------------------------------------------------------------ - // Lazy factory for lists (put if absent) - // ------------------------------------------------------------------------ - - @SuppressWarnings("unchecked") - private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() { - return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY; - } - - private static class ThrowingKeyedStateStore implements KeyedStateStore { - @Override - public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { - throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); - } - - @Override - public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { - throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); - } - - @Override - public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { - throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); - } - - @Override - public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) { - throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); - } - - @Override - public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { - throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows."); - } - } - - private static class AccumulatingKeyedTimePanesContext implements InternalWindowFunction.InternalWindowContext { - KeyedStateStore globalState; - KeyedStateStore throwingStore; - - public AccumulatingKeyedTimePanesContext() { - this.throwingStore = new ThrowingKeyedStateStore(); - } - - @Override - public long currentProcessingTime() { - throw new UnsupportedOperationException("current processing time is not supported in this context"); - } - - @Override - public long currentWatermark() { - throw new UnsupportedOperationException("current watermark is not supported in this context"); - } - - @Override - public KeyedStateStore windowState() { - return throwingStore; - } - - @Override - public KeyedStateStore globalState() { - return globalState; - } - } - - private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() { - - @Override - public ArrayList<?> create() { - return new ArrayList<>(4); - } - }; -} http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java deleted file mode 100644 index 66d41f1..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -/** - * Key/value map organized in panes for aggregating windows (with a reduce function). - */ -@Internal -public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> { - - private final KeySelector<Type, Key> keySelector; - - private final ReduceFunction<Type> reducer; - - /** - * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries - * have (zero). - */ - private long evaluationPass = 1L; - - // ------------------------------------------------------------------------ - - public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) { - this.keySelector = keySelector; - this.reducer = reducer; - } - - // ------------------------------------------------------------------------ - - @Override - public void addElementToLatestPane(Type element) throws Exception { - Key k = keySelector.getKey(element); - latestPane.putOrAggregate(k, element, reducer); - } - - @Override - public void evaluateWindow(Collector<Type> out, TimeWindow window, - AbstractStreamOperator<Type> operator) throws Exception { - if (previousPanes.isEmpty()) { - // optimized path for single pane case - for (KeyMap.Entry<Key, Type> entry : latestPane) { - out.collect(entry.getValue()); - } - } - else { - // general code path for multi-pane case - AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out, operator); - traverseAllPanes(evaluator, evaluationPass); - } - - evaluationPass++; - } - - // ------------------------------------------------------------------------ - // The maps traversal that performs the final aggregation - // ------------------------------------------------------------------------ - - static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> { - - private final ReduceFunction<Type> function; - - private final Collector<Type> out; - - private final AbstractStreamOperator<Type> operator; - - private Type currentValue; - - AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out, - AbstractStreamOperator<Type> operator) { - this.function = function; - this.out = out; - this.operator = operator; - } - - @Override - public void startNewKey(Key key) { - currentValue = null; - operator.setCurrentKey(key); - } - - @Override - public void nextValue(Type value) throws Exception { - if (currentValue != null) { - currentValue = function.reduce(currentValue, value); - } - else { - currentValue = value; - } - } - - @Override - public void keyDone() throws Exception { - out.collect(currentValue); - } - } -}