Repository: flink
Updated Branches:
  refs/heads/master d8ed58b6a -> 1ebd44a63


[hotfix] Remove leftover KeyedTimePanes

Recently, the aligned window operators were removes, these classes where
leftover after that removal.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68a99d7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68a99d7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68a99d7a

Branch: refs/heads/master
Commit: 68a99d7ab7ff10c3c0b6cd19babedbfdbfc31354
Parents: d8ed58b
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Sep 25 10:59:39 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Sep 25 10:59:39 2017 +0200

----------------------------------------------------------------------
 .../windowing/AbstractKeyedTimePanes.java       | 156 -------------
 .../windowing/AccumulatingKeyedTimePanes.java   | 224 -------------------
 .../windowing/AggregatingKeyedTimePanes.java    | 119 ----------
 3 files changed, 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
deleted file mode 100644
index f815107..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-
-/**
- * Base class for a multiple key/value maps organized in panes.
- */
-@Internal
-public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
-
-       private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
-
-       private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
-
-       /** The latest time pane. */
-       protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
-
-       /** The previous time panes, ordered by time (early to late). */
-       protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new 
ArrayDeque<>();
-
-       // 
------------------------------------------------------------------------
-
-       public abstract void addElementToLatestPane(Type element) throws 
Exception;
-
-       public abstract void evaluateWindow(Collector<Result> out, TimeWindow 
window, AbstractStreamOperator<Result> operator) throws Exception;
-
-       public void dispose() {
-               // since all is heap data, there is no need to clean up anything
-               latestPane = null;
-               previousPanes.clear();
-       }
-
-       public int getNumPanes() {
-               return previousPanes.size() + 1;
-       }
-
-       public void slidePanes(int panesToKeep) {
-               if (panesToKeep > 1) {
-                       // the current pane becomes the latest previous pane
-                       previousPanes.addLast(latestPane);
-
-                       // truncate the history
-                       while (previousPanes.size() >= panesToKeep) {
-                               previousPanes.removeFirst();
-                       }
-               }
-
-               // we need a new latest pane
-               latestPane = new KeyMap<>();
-       }
-
-       public void truncatePanes(int numToRetain) {
-               while (previousPanes.size() >= numToRetain) {
-                       previousPanes.removeFirst();
-               }
-       }
-
-       protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, 
Aggregate> traversal, long traversalPass) throws Exception{
-               // gather all panes in an array (faster iterations)
-               @SuppressWarnings({"unchecked", "rawtypes"})
-               KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new 
KeyMap[previousPanes.size() + 1]);
-               panes[panes.length - 1] = latestPane;
-
-               // let the maps make a coordinated traversal and evaluate the 
window function per contained key
-               KeyMap.traverseMaps(panes, traversal, traversalPass);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Serialization and de-serialization
-       // 
------------------------------------------------------------------------
-
-       public void writeToOutput(
-                       final DataOutputView output,
-                       final TypeSerializer<Key> keySerializer,
-                       final TypeSerializer<Aggregate> aggSerializer) throws 
IOException {
-               output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);
-
-               int numPanes = getNumPanes();
-               output.writeInt(numPanes);
-
-               // write from the past
-               Iterator<KeyMap<Key, Aggregate>> previous = 
previousPanes.iterator();
-               for (int paneNum = 0; paneNum < numPanes; paneNum++) {
-                       output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER);
-                       KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) 
? latestPane : previous.next();
-
-                       output.writeInt(pane.size());
-                       for (KeyMap.Entry<Key, Aggregate> entry : pane) {
-                               keySerializer.serialize(entry.getKey(), output);
-                               aggSerializer.serialize(entry.getValue(), 
output);
-                       }
-               }
-       }
-
-       public void readFromInput(
-                       final DataInputView input,
-                       final TypeSerializer<Key> keySerializer,
-                       final TypeSerializer<Aggregate> aggSerializer) throws 
IOException {
-               validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, 
input.readInt());
-               int numPanes = input.readInt();
-
-               // read from the past towards the presence
-               while (numPanes > 0) {
-                       validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, 
input.readInt());
-                       KeyMap<Key, Aggregate> pane = (numPanes == 1) ? 
latestPane : new KeyMap<Key, Aggregate>();
-
-                       final int numElementsInPane = input.readInt();
-                       for (int i = numElementsInPane - 1; i >= 0; i--) {
-                               Key k = keySerializer.deserialize(input);
-                               Aggregate a = aggSerializer.deserialize(input);
-                               pane.put(k, a);
-                       }
-
-                       if (numPanes > 1) {
-                               previousPanes.addLast(pane);
-                       }
-                       numPanes--;
-               }
-       }
-
-       private static void validateMagicNumber(int expected, int found) throws 
IOException {
-               if (expected != found) {
-                       throw new IOException("Corrupt state stream - wrong 
magic number. " +
-                               "Expected '" + Integer.toHexString(expected) +
-                               "', found '" + Integer.toHexString(found) + 
'\'');
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
deleted file mode 100644
index 6892aaa..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyedStateStore;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.UnionIterator;
-
-import java.util.ArrayList;
-
-/**
- * Key/value map organized in panes for accumulating windows (with a window 
function).
- */
-@Internal
-public class AccumulatingKeyedTimePanes<Type, Key, Result> extends 
AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
-
-       private final KeySelector<Type, Key> keySelector;
-
-       private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
-
-       private final InternalWindowFunction<Iterable<Type>, Result, Key, 
Window> function;
-
-       private final AccumulatingKeyedTimePanesContext context;
-
-       /**
-        * IMPORTANT: This value needs to start at one, so it is fresher than 
the value that new entries
-        * have (zero).
-        */
-       private long evaluationPass = 1L;
-
-       // 
------------------------------------------------------------------------
-
-       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
-               this.keySelector = keySelector;
-               this.function = function;
-               this.context = new AccumulatingKeyedTimePanesContext();
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void addElementToLatestPane(Type element) throws Exception {
-               Key k = keySelector.getKey(element);
-               ArrayList<Type> elements = latestPane.putIfAbsent(k, 
listFactory);
-               elements.add(element);
-       }
-
-       @Override
-       public void evaluateWindow(Collector<Result> out, final TimeWindow 
window,
-                                                               
AbstractStreamOperator<Result> operator) throws Exception {
-               if (previousPanes.isEmpty()) {
-                       // optimized path for single pane case (tumbling window)
-                       for (KeyMap.Entry<Key, ArrayList<Type>> entry : 
latestPane) {
-                               Key key = entry.getKey();
-                               operator.setCurrentKey(key);
-                               context.globalState = 
operator.getKeyedStateStore();
-
-                               function.process(entry.getKey(), window, 
context, entry.getValue(), out);
-                       }
-               }
-               else {
-                       // general code path for multi-pane case
-                       WindowFunctionTraversal<Key, Type, Result> evaluator = 
new WindowFunctionTraversal<>(
-                                       function, window, out, operator, 
context);
-                       traverseAllPanes(evaluator, evaluationPass);
-               }
-
-               evaluationPass++;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Running a window function in a map traversal
-       // 
------------------------------------------------------------------------
-
-       static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
-
-               private final InternalWindowFunction<Iterable<Type>, Result, 
Key, Window> function;
-
-               private final UnionIterator<Type> unionIterator;
-
-               private final Collector<Result> out;
-
-               private final TimeWindow window;
-
-               private final AbstractStreamOperator<Result> contextOperator;
-
-               private Key currentKey;
-
-               private AccumulatingKeyedTimePanesContext context;
-
-               WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, 
Result, Key, Window> function, TimeWindow window,
-                                                               
Collector<Result> out, AbstractStreamOperator<Result> contextOperator, 
AccumulatingKeyedTimePanesContext context) {
-                       this.function = function;
-                       this.out = out;
-                       this.unionIterator = new UnionIterator<>();
-                       this.window = window;
-                       this.contextOperator = contextOperator;
-                       this.context = context;
-               }
-
-               @Override
-               public void startNewKey(Key key) {
-                       unionIterator.clear();
-                       currentKey = key;
-               }
-
-               @Override
-               public void nextValue(ArrayList<Type> value) {
-                       unionIterator.addList(value);
-               }
-
-               @Override
-               public void keyDone() throws Exception {
-                       contextOperator.setCurrentKey(currentKey);
-                       context.globalState = 
contextOperator.getKeyedStateStore();
-                       function.process(currentKey, window, context, 
unionIterator, out);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Lazy factory for lists (put if absent)
-       // 
------------------------------------------------------------------------
-
-       @SuppressWarnings("unchecked")
-       private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
-               return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
-       }
-
-       private static class ThrowingKeyedStateStore implements KeyedStateStore 
{
-               @Override
-               public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
-                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
-               }
-
-               @Override
-               public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
-                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
-               }
-
-               @Override
-               public <T> ReducingState<T> 
getReducingState(ReducingStateDescriptor<T> stateProperties) {
-                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
-               }
-
-               @Override
-               public <T, A> FoldingState<T, A> 
getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
-                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
-               }
-
-               @Override
-               public <UK, UV> MapState<UK, UV> 
getMapState(MapStateDescriptor<UK, UV> stateProperties) {
-                       throw new UnsupportedOperationException("Per-window 
state is not supported when using aligned processing-time windows.");
-               }
-       }
-
-       private static class AccumulatingKeyedTimePanesContext implements 
InternalWindowFunction.InternalWindowContext {
-               KeyedStateStore globalState;
-               KeyedStateStore throwingStore;
-
-               public AccumulatingKeyedTimePanesContext() {
-                       this.throwingStore = new ThrowingKeyedStateStore();
-               }
-
-               @Override
-               public long currentProcessingTime() {
-                       throw new UnsupportedOperationException("current 
processing time is not supported in this context");
-               }
-
-               @Override
-               public long currentWatermark() {
-                       throw new UnsupportedOperationException("current 
watermark is not supported in this context");
-               }
-
-               @Override
-               public KeyedStateStore windowState() {
-                       return throwingStore;
-               }
-
-               @Override
-               public KeyedStateStore globalState() {
-                       return globalState;
-               }
-       }
-
-       private static final KeyMap.LazyFactory<?> LIST_FACTORY = new 
KeyMap.LazyFactory<ArrayList<?>>() {
-
-               @Override
-               public ArrayList<?> create() {
-                       return new ArrayList<>(4);
-               }
-       };
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
deleted file mode 100644
index 66d41f1..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Key/value map organized in panes for aggregating windows (with a reduce 
function).
- */
-@Internal
-public class AggregatingKeyedTimePanes<Type, Key> extends 
AbstractKeyedTimePanes<Type, Key, Type, Type> {
-
-       private final KeySelector<Type, Key> keySelector;
-
-       private final ReduceFunction<Type> reducer;
-
-       /**
-        * IMPORTANT: This value needs to start at one, so it is fresher than 
the value that new entries
-        * have (zero).
-        */
-       private long evaluationPass = 1L;
-
-       // 
------------------------------------------------------------------------
-
-       public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
ReduceFunction<Type> reducer) {
-               this.keySelector = keySelector;
-               this.reducer = reducer;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void addElementToLatestPane(Type element) throws Exception {
-               Key k = keySelector.getKey(element);
-               latestPane.putOrAggregate(k, element, reducer);
-       }
-
-       @Override
-       public void evaluateWindow(Collector<Type> out, TimeWindow window,
-                                                               
AbstractStreamOperator<Type> operator) throws Exception {
-               if (previousPanes.isEmpty()) {
-                       // optimized path for single pane case
-                       for (KeyMap.Entry<Key, Type> entry : latestPane) {
-                               out.collect(entry.getValue());
-                       }
-               }
-               else {
-                       // general code path for multi-pane case
-                       AggregatingTraversal<Key, Type> evaluator = new 
AggregatingTraversal<>(reducer, out, operator);
-                       traverseAllPanes(evaluator, evaluationPass);
-               }
-
-               evaluationPass++;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  The maps traversal that performs the final aggregation
-       // 
------------------------------------------------------------------------
-
-       static final class AggregatingTraversal<Key, Type> implements 
KeyMap.TraversalEvaluator<Key, Type> {
-
-               private final ReduceFunction<Type> function;
-
-               private final Collector<Type> out;
-
-               private final AbstractStreamOperator<Type> operator;
-
-               private Type currentValue;
-
-               AggregatingTraversal(ReduceFunction<Type> function, 
Collector<Type> out,
-                                                               
AbstractStreamOperator<Type> operator) {
-                       this.function = function;
-                       this.out = out;
-                       this.operator = operator;
-               }
-
-               @Override
-               public void startNewKey(Key key) {
-                       currentValue = null;
-                       operator.setCurrentKey(key);
-               }
-
-               @Override
-               public void nextValue(Type value) throws Exception {
-                       if (currentValue != null) {
-                               currentValue = function.reduce(currentValue, 
value);
-                       }
-                       else {
-                               currentValue = value;
-                       }
-               }
-
-               @Override
-               public void keyDone() throws Exception {
-                       out.collect(currentValue);
-               }
-       }
-}

Reply via email to