Repository: flink Updated Branches: refs/heads/master 884d3e2a4 -> 12bf7c1a0
[FLINK-4207] WindowOperator becomes very slow with allowed lateness Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12bf7c1a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12bf7c1a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12bf7c1a Branch: refs/heads/master Commit: 12bf7c1a0b81d199085fe874c64763c51a93b3bf Parents: 884d3e2 Author: kl0u <kklou...@gmail.com> Authored: Mon Jul 18 11:37:06 2016 +0200 Committer: kl0u <kklou...@gmail.com> Committed: Tue Jul 26 21:12:05 2016 +0200 ---------------------------------------------------------------------- .../streaming/state/RocksDBFoldingState.java | 2 +- .../streaming/state/RocksDBListState.java | 3 +- .../flink/api/common/state/AppendingState.java | 10 +- .../flink/runtime/state/GenericListState.java | 7 +- .../state/filesystem/FsFoldingState.java | 7 +- .../runtime/state/filesystem/FsListState.java | 13 +- .../runtime/state/memory/MemFoldingState.java | 7 +- .../runtime/state/memory/MemListState.java | 13 +- .../runtime/state/StateBackendTestBase.java | 8 +- .../api/windowing/triggers/PurgingTrigger.java | 2 +- .../windowing/EvictingWindowOperator.java | 75 +++- .../operators/windowing/MergingWindowSet.java | 14 +- .../operators/windowing/WindowOperator.java | 74 +++- .../windowing/MergingWindowSetTest.java | 7 +- .../operators/windowing/WindowOperatorTest.java | 438 +++++++++++++++++++ .../sessionwindows/SessionWindowITCase.java | 2 - 16 files changed, 580 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 8ffe3a6..218fa2a 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -93,7 +93,7 @@ public class RocksDBFoldingState<K, N, T, ACC> byte[] key = baos.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { - return stateDesc.getDefaultValue(); + return null; } return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); } catch (IOException|RocksDBException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index b13c5ae..ce3a48e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -32,7 +32,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import static java.util.Objects.requireNonNull; @@ -94,7 +93,7 @@ public class RocksDBListState<K, N, V> byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { - return Collections.emptyList(); + return null; } ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes); http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java index 04dc784..8ea8364 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java @@ -46,8 +46,14 @@ public interface AppendingState<IN, OUT> extends State { * operator instance. If state partitioning is applied, the value returned * depends on the current operator input, as the operator maintains an * independent state for each partition. - * - * @return The operator state value corresponding to the current input. + * + * <p> + * <b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method + * should return {@code null}. + * </p> + * + * @return The operator state value corresponding to the current input or {@code null} + * if the state is empty. * * @throws Exception Thrown if the system cannot access the state. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java index 3414855..2e40898 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; /** * Generic implementation of {@link ListState} based on a wrapped {@link ValueState}. @@ -82,11 +81,7 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e @Override public Iterable<T> get() throws Exception { - ArrayList<T> result = wrappedState.value(); - if (result == null) { - return Collections.emptyList(); - } - return result; + return wrappedState.value(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java index bba6df5..90baf36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java @@ -88,11 +88,8 @@ public class FsFoldingState<K, N, T, ACC> if (currentNSState == null) { currentNSState = state.get(currentNamespace); } - if (currentNSState != null) { - ACC value = currentNSState.get(currentKey); - return value != null ? value : stateDesc.getDefaultValue(); - } - return stateDesc.getDefaultValue(); + return currentNSState != null ? + currentNSState.get(currentKey) : null; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java index 1d5b5f8..46c9830 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java @@ -27,9 +27,7 @@ import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.KvStateSnapshot; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -85,15 +83,8 @@ public class FsListState<K, N, V> if (currentNSState == null) { currentNSState = state.get(currentNamespace); } - if (currentNSState != null) { - List<V> result = currentNSState.get(currentKey); - if (result == null) { - return Collections.emptyList(); - } else { - return result; - } - } - return Collections.emptyList(); + return currentNSState != null ? + currentNSState.get(currentKey) : null; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java index 07b677b..9953a64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java @@ -64,11 +64,8 @@ public class MemFoldingState<K, N, T, ACC> if (currentNSState == null) { currentNSState = state.get(currentNamespace); } - if (currentNSState != null) { - ACC value = currentNSState.get(currentKey); - return value != null ? value : stateDesc.getDefaultValue(); - } - return stateDesc.getDefaultValue(); + return currentNSState != null ? + currentNSState.get(currentKey) : null; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java index d5e4dfd..97461d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java @@ -26,9 +26,7 @@ import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.KvStateSnapshot; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -56,15 +54,8 @@ public class MemListState<K, N, V> if (currentNSState == null) { currentNSState = state.get(currentNamespace); } - if (currentNSState != null) { - List<V> result = currentNSState.get(currentKey); - if (result == null) { - return Collections.emptyList(); - } else { - return result; - } - } - return Collections.emptyList(); + return currentNSState != null ? + currentNSState.get(currentKey) : null; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 12cf112..80f1de3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -243,10 +243,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { Joiner joiner = Joiner.on(","); // some modifications to the state backend.setCurrentKey(1); - assertEquals("", joiner.join(state.get())); + assertEquals(null, state.get()); state.add("1"); backend.setCurrentKey(2); - assertEquals("", joiner.join(state.get())); + assertEquals(null, state.get()); state.add("2"); backend.setCurrentKey(1); assertEquals("1", joiner.join(state.get())); @@ -438,10 +438,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // some modifications to the state backend.setCurrentKey(1); - assertEquals("Fold-Initial:", state.get()); + assertEquals(null, state.get()); state.add(1); backend.setCurrentKey(2); - assertEquals("Fold-Initial:", state.get()); + assertEquals(null, state.get()); state.add(2); backend.setCurrentKey(1); assertEquals("Fold-Initial:,1", state.get()); http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java index 85d0b52..8b30130 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java @@ -72,7 +72,7 @@ public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> { @Override public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx); - return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : triggerResult; + return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 3f2c6a3..15f716c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -132,12 +132,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window // check if the window is already inactive if (isLate(actualWindow)) { - LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness."); mergingWindows.retireWindow(actualWindow); continue; } W stateWindow = mergingWindows.getStateWindow(actualWindow); + if (stateWindow == null) { + throw new IllegalStateException("Window " + window + " is not in in-flight window set."); + } ListState<StreamRecord<IN>> windowState = getPartitionedState( stateWindow, windowSerializer, windowStateDescriptor); windowState.add(element); @@ -149,7 +151,15 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window // on the (possibly merged) window TriggerResult triggerResult = context.onElement(element); TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0); - fireOrContinue(combinedTriggerResult, actualWindow, windowState); + + if (combinedTriggerResult.isFire()) { + Iterable<StreamRecord<IN>> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + fire(actualWindow, contents); + } if (combinedTriggerResult.isPurge()) { cleanup(actualWindow, windowState, mergingWindows); @@ -163,7 +173,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window // check if the window is already inactive if (isLate(window)) { - LOG.info("Dropped element " + element + " for window " + window + " due to lateness."); continue; } @@ -175,7 +184,15 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window context.window = window; TriggerResult triggerResult = context.onElement(element); - fireOrContinue(triggerResult, window, windowState); + + if (triggerResult.isFire()) { + Iterable<StreamRecord<IN>> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + fire(window, contents); + } if (triggerResult.isPurge()) { cleanup(window, windowState, null); @@ -207,16 +224,30 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + continue; + } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } + Iterable<StreamRecord<IN>> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + TriggerResult triggerResult = context.onEventTime(timer.timestamp); - fireOrContinue(triggerResult, context.window, windowState); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) { - cleanup(timer.window, windowState, mergingWindows); + if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } } else { @@ -255,16 +286,30 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + continue; + } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } + Iterable<StreamRecord<IN>> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); - fireOrContinue(triggerResult, context.window, windowState); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) { - cleanup(timer.window, windowState, mergingWindows); + if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } } else { @@ -273,15 +318,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window } while (fire); } - private void fireOrContinue(TriggerResult triggerResult, - W window, - ListState<StreamRecord<IN>> windowState) throws Exception { - if (!triggerResult.isFire()) { - return; - } - + private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); - Iterable<StreamRecord<IN>> contents = windowState.get(); // Work around type system restrictions... int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); @@ -307,7 +345,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window mergingWindows.retireWindow(window); } context.clear(); - deleteCleanupTimer(window); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java index d02a348..c806d2d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java @@ -80,8 +80,11 @@ public class MergingWindowSet<W extends Window> { this.windowAssigner = windowAssigner; windows = new HashMap<>(); - for (Tuple2<W, W> window: state.get()) { - windows.put(window.f0, window.f1); + Iterable<Tuple2<W, W>> windowState = state.get(); + if (windowState != null) { + for (Tuple2<W, W> window: windowState) { + windows.put(window.f0, window.f1); + } } } @@ -100,12 +103,7 @@ public class MergingWindowSet<W extends Window> { * @param window The window for which to get the state window. */ public W getStateWindow(W window) { - W result = windows.get(window); - if (result == null) { - throw new IllegalStateException("Window " + window + " is not in in-flight window set."); - } - - return result; + return windows.get(window); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index bb05d2b..2434843 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -334,12 +334,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // drop if the window is already late if (isLate(actualWindow)) { - LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness."); mergingWindows.retireWindow(actualWindow); continue; } W stateWindow = mergingWindows.getStateWindow(actualWindow); + if (stateWindow == null) { + throw new IllegalStateException("Window " + window + " is not in in-flight window set."); + } + AppendingState<IN, ACC> windowState = getPartitionedState( stateWindow, windowSerializer, windowStateDescriptor); windowState.add(element.getValue()); @@ -351,7 +354,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // on the (possibly merged) window TriggerResult triggerResult = context.onElement(element); TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0); - fireOrContinue(combinedTriggerResult, actualWindow, windowState); + + if (combinedTriggerResult.isFire()) { + ACC contents = windowState.get(); + if (contents == null) { + continue; + } + fire(actualWindow, contents); + } if (combinedTriggerResult.isPurge()) { cleanup(actualWindow, windowState, mergingWindows); @@ -364,7 +374,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // drop if the window is already late if (isLate(window)) { - LOG.info("Dropped element " + element + " for window " + window + " due to lateness."); continue; } @@ -376,7 +385,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> context.window = window; TriggerResult triggerResult = context.onElement(element); - fireOrContinue(triggerResult, window, windowState); + + if (triggerResult.isFire()) { + ACC contents = windowState.get(); + if (contents == null) { + continue; + } + fire(window, contents); + } if (triggerResult.isPurge()) { cleanup(window, windowState, null); @@ -408,16 +424,30 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + continue; + } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } + ACC contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + TriggerResult triggerResult = context.onEventTime(timer.timestamp); - fireOrContinue(triggerResult, context.window, windowState); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) { - cleanup(timer.window, windowState, mergingWindows); + if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } } else { @@ -456,16 +486,30 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + continue; + } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } + ACC contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); - fireOrContinue(triggerResult, context.window, windowState); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) { - cleanup(timer.window, windowState, mergingWindows); + if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } } else { @@ -487,7 +531,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> mergingWindows.retireWindow(window); } context.clear(); - deleteCleanupTimer(window); } /** @@ -495,15 +538,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * The caller must ensure that the correct key is set in the state backend and the context object. */ @SuppressWarnings("unchecked") - private void fireOrContinue(TriggerResult triggerResult, - W window, - AppendingState<IN, ACC> windowState) throws Exception { - if (!triggerResult.isFire()) { - return; - } - + private void fire(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); - ACC contents = windowState.get(); userFunction.apply(context.key, context.window, contents, timestampedCollector); } http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java index cf90f8a..939f13f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java @@ -133,12 +133,7 @@ public class MergingWindowSetTest { // retire the first batch of windows windowSet.retireWindow(new TimeWindow(0, 6)); - try { - windowSet.getStateWindow(new TimeWindow(0, 6)); - fail("Expected exception"); - } catch (IllegalStateException e) { - //ignore - } + assertTrue(windowSet.getStateWindow(new TimeWindow(0, 6)) == null); assertTrue(windowSet.getStateWindow(new TimeWindow(10, 15)).equals(new TimeWindow(11, 14))); } http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 90bd3f2..62266c4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -17,9 +17,12 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; +import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -51,6 +54,8 @@ import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -1952,10 +1957,379 @@ public class WindowOperatorTest { testHarness.close(); } + @Test + public void testCleanupTimerWithEmptyListStateForTumblingWindows2() throws Exception { + final int WINDOW_SIZE = 2; + final long LATENESS = 100; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc = + new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, String, TimeWindow> operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalIterableWindowFunction<>(new PassThroughFunction2()), + new EventTimeTriggerAccumGC(LATENESS), + LATENESS); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); + + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(1599)); + testHarness.processWatermark(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2100)); + testHarness.processWatermark(new Watermark(5000)); + + expected.add(new Watermark(1599)); + expected.add(new StreamRecord<>("GOT: (key2,1)", 1999)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2100)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + private class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { + out.collect("GOT: " + Joiner.on(",").join(input)); + } + } + + @Test + public void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception { + final int WINDOW_SIZE = 2; + final long LATENESS = 1; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc = + new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalIterableWindowFunction<>(new PassThroughFunction()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); + + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(1599)); + testHarness.processWatermark(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2000)); + testHarness.processWatermark(new Watermark(5000)); + + expected.add(new Watermark(1599)); + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2000)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception { + final int WINDOW_SIZE = 2; + final long LATENESS = 1; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); + + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(1599)); + testHarness.processWatermark(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2000)); + testHarness.processWatermark(new Watermark(5000)); + + expected.add(new Watermark(1599)); + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2000)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() throws Exception { + final int WINDOW_SIZE = 2; + final long LATENESS = 1; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, Integer>> windowStateDesc = + new FoldingStateDescriptor<>( + "window-contents", + new Tuple2<>((String) null, 0), + new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception { + return new Tuple2<>(value.f0, accumulator.f1 + value.f1); + } + }, + inputType); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughFunction()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); + + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(1599)); + testHarness.processWatermark(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2000)); + testHarness.processWatermark(new Watermark(5000)); + + expected.add(new Watermark(1599)); + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2000)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyListStateForSessionWindows() throws Exception { + final int GAP_SIZE = 3; + final long LATENESS = 10; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc = + new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = + new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalIterableWindowFunction<>(new PassThroughFunction()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(4998)); + + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + expected.add(new Watermark(4998)); + + testHarness.processWatermark(new Watermark(14600)); + expected.add(new Watermark(14600)); + + ConcurrentLinkedQueue<Object> actual = testHarness.getOutput(); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception { + + final int GAP_SIZE = 3; + final long LATENESS = 10; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Long, Long>, TimeWindow> operator = + new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), + EventTimeTrigger.create(), + LATENESS); + + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(4998)); + + expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 4000L), 3999)); + expected.add(new Watermark(4998)); + + testHarness.processWatermark(new Watermark(14600)); + expected.add(new Watermark(14600)); + + ConcurrentLinkedQueue<Object> actual = testHarness.getOutput(); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception { + final int GAP_SIZE = 3; + final long LATENESS = 10; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + FoldingStateDescriptor<Tuple2<String, Integer>, Tuple2<String, Integer>> windowStateDesc = + new FoldingStateDescriptor<>( + "window-contents", + new Tuple2<>((String) null, 0), + new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception { + return new Tuple2<>(value.f0, accumulator.f1 + value.f1); + } + }, + inputType); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = + new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughFunction()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(4998)); + + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + expected.add(new Watermark(4998)); + + testHarness.processWatermark(new Watermark(14600)); + expected.add(new Watermark(14600)); + + ConcurrentLinkedQueue<Object> actual = testHarness.getOutput(); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator()); + testHarness.close(); + } + // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------ + private class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { + for (Tuple2<String, Integer> in: input) { + out.collect(in); + } + } + } + public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override @@ -2112,4 +2486,68 @@ public class WindowOperatorTest { return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); } } + + /** + * A trigger that fires at the end of the window but does not + * purge the state of the fired window. This is to test the state + * garbage collection mechanism. + */ + public class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + private long cleanupTime; + + private EventTimeTriggerAccumGC() { + cleanupTime = 0L; + } + + public EventTimeTriggerAccumGC(long cleanupTime) { + this.cleanupTime = cleanupTime; + } + + @Override + public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { + if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { + // if the watermark is already past the window fire immediately + return TriggerResult.FIRE; + } else { + ctx.registerEventTimeTimer(window.maxTimestamp()); + return TriggerResult.CONTINUE; + } + } + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { + return time == window.maxTimestamp() || time == window.maxTimestamp() + cleanupTime ? + TriggerResult.FIRE_AND_PURGE : + TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception { + ctx.deleteEventTimeTimer(window.maxTimestamp()); + } + + @Override + public boolean canMerge() { + return true; + } + + @Override + public TriggerResult onMerge(TimeWindow window, + OnMergeContext ctx) { + ctx.registerEventTimeTimer(window.maxTimestamp()); + return TriggerResult.CONTINUE; + } + + @Override + public String toString() { + return "EventTimeTrigger()"; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/12bf7c1a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java index eb137aa..9b4855f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java @@ -22,9 +22,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.WindowedStream; -import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction;