http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 628d663..9b8af58 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -19,10 +19,10 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.commons.math3.util.ArithmeticUtils; + import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.AppendingState; -import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MergingState; import org.apache.flink.api.common.state.State; @@ -42,6 +42,9 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalAppendingState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMergingState; import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; @@ -107,22 +110,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> protected final WindowAssigner<? super IN, W> windowAssigner; - protected final KeySelector<IN, K> keySelector; - - protected final Trigger<? super IN, ? super W> trigger; + private final KeySelector<IN, K> keySelector; - protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor; + private final Trigger<? super IN, ? super W> trigger; - protected final ListStateDescriptor<Tuple2<W, W>> mergingWindowsDescriptor; + private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor; - /** - * For serializing the key in checkpoints. - */ + /** For serializing the key in checkpoints. */ protected final TypeSerializer<K> keySerializer; - /** - * For serializing the window in checkpoints. - */ + /** For serializing the window in checkpoints. */ protected final TypeSerializer<W> windowSerializer; /** @@ -133,15 +130,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * {@code window.maxTimestamp + allowedLateness} landmark. * </ul> */ - protected final long allowedLateness; + private final long allowedLateness; // ------------------------------------------------------------------------ // State that is not checkpointed // ------------------------------------------------------------------------ - /** - * This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. - */ + /** The state in which the window contents is stored. Each window is a namespace */ + private transient InternalAppendingState<W, IN, ACC> windowState; + + /** The {@link #windowState}, typed to merging state for merging windows. + * Null if the window state is not mergeable */ + private transient InternalMergingState<W, IN, ACC> windowMergingState; + + /** The state that holds the merging window metadata (the sets that describe what is merged) */ + private transient InternalListState<VoidNamespace, Tuple2<W, W>> mergingSetsState; + + /** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */ protected transient TimestampedCollector<OUT> timestampedCollector; protected transient Context context = new Context(null, null); @@ -234,14 +239,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> this.allowedLateness = allowedLateness; this.legacyWindowOperatorType = legacyWindowOperatorType; - if (windowAssigner instanceof MergingWindowAssigner) { - @SuppressWarnings({"unchecked", "rawtypes"}) - TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} ); - mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer); - } else { - mergingWindowsDescriptor = null; - } - setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -263,6 +260,43 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } }; + // create (or restore) the state that hold the actual window contents + // NOTE - the state may be null in the case of the overriding evicting window operator + if (windowStateDescriptor != null) { + windowState = (InternalAppendingState<W, IN, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor); + } + + // create the typed and helper states for merging windows + if (windowAssigner instanceof MergingWindowAssigner) { + + // store a typed reference for the state of merging windows - sanity check + if (windowState instanceof InternalMergingState) { + windowMergingState = (InternalMergingState<W, IN, ACC>) windowState; + } + // TODO this sanity check should be here, but is prevented by an incorrect test (pending validation) + // TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows() + // TODO activate the sanity check once resolved +// else if (windowState != null) { +// throw new IllegalStateException( +// "The window uses a merging assigner, but the window state is not mergeable."); +// } + + @SuppressWarnings("unchecked") + final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class; + + final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>( + typedTuple, + new TypeSerializer[] {windowSerializer, windowSerializer} ); + + final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor = + new ListStateDescriptor<>("merging-window-set", tupleSerializer); + + // get the state that stores the merging sets + mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>) + getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor); + mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE); + } + registerRestoredLegacyStateState(); } @@ -283,12 +317,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @Override - @SuppressWarnings("unchecked") public void processElement(StreamRecord<IN> element) throws Exception { - Collection<W> elementWindows = windowAssigner.assignWindows( + final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); - - final K key = (K) getKeyedStateBackend().getCurrentKey(); + + final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); @@ -315,11 +348,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } // merge the merged state windows into the newly resulting state window - getKeyedStateBackend().mergePartitionedStates( - stateWindowResult, - mergedStateWindows, - windowSerializer, - (StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor); + windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); @@ -334,8 +363,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } - AppendingState<IN, ACC> windowState = - getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); context.key = key; @@ -368,8 +396,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> continue; } - AppendingState<IN, ACC> windowState = - getPartitionedState(window, windowSerializer, windowStateDescriptor); + windowState.setCurrentNamespace(window); windowState.add(element.getValue()); context.key = key; @@ -399,8 +426,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> context.key = timer.getKey(); context.window = timer.getNamespace(); - AppendingState<IN, ACC> windowState; - MergingWindowSet<W> mergingWindows = null; + MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); @@ -411,12 +437,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // so it is safe to just ignore return; } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + + windowState.setCurrentNamespace(stateWindow); } else { - windowState = getPartitionedState( - context.window, - windowSerializer, - windowStateDescriptor); + windowState.setCurrentNamespace(context.window); + mergingWindows = null; } ACC contents = windowState.get(); @@ -440,8 +465,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> context.key = timer.getKey(); context.window = timer.getNamespace(); - AppendingState<IN, ACC> windowState; - MergingWindowSet<W> mergingWindows = null; + MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); @@ -452,9 +476,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // so it is safe to just ignore return; } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + windowState.setCurrentNamespace(stateWindow); } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); + windowState.setCurrentNamespace(context.window); + mergingWindows = null; } ACC contents = windowState.get(); @@ -507,13 +532,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * {@link MergingWindowSet#persist()}. */ protected MergingWindowSet<W> getMergingWindowSet() throws Exception { - ListState<Tuple2<W, W>> mergeState = - getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergingWindowsDescriptor); - - @SuppressWarnings({"unchecked", "rawtypes"}) - MergingWindowSet<W> mergingWindows = new MergingWindowSet<>((MergingWindowAssigner) windowAssigner, mergeState); - - return mergingWindows; + @SuppressWarnings("unchecked") + MergingWindowAssigner<? super IN, W> mergingAssigner = (MergingWindowAssigner<? super IN, W>) windowAssigner; + return new MergingWindowSet<>(mergingAssigner, mergingSetsState); } /** @@ -655,11 +676,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) { if (mergedWindows != null && mergedWindows.size() > 0) { try { - WindowOperator.this.getKeyedStateBackend().mergePartitionedStates(window, - mergedWindows, - windowSerializer, - stateDescriptor); - } catch (Exception e) { + S rawState = getKeyedStateBackend().getOrCreateKeyedState(windowSerializer, stateDescriptor); + + if (rawState instanceof InternalMergingState) { + @SuppressWarnings("unchecked") + InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState; + mergingState.mergeNamespaces(window, mergedWindows); + } + else { + throw new IllegalArgumentException( + "The given state descriptor does not refer to a mergeable state (MergingState)"); + } + } + catch (Exception e) { throw new RuntimeException("Error while merging state.", e); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 0e2d1e8..2faa506 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -72,6 +72,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -83,6 +84,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@SuppressWarnings("serial") public class WindowOperatorTest extends TestLogger { // For counting if close() is called the correct number of times on the SumReducer @@ -758,7 +760,7 @@ public class WindowOperatorTest extends TestLogger { 0); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -846,7 +848,7 @@ public class WindowOperatorTest extends TestLogger { 0); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -1124,7 +1126,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1184,7 +1186,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1250,7 +1252,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1310,7 +1312,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1385,7 +1387,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1475,7 +1477,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1559,7 +1561,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1643,7 +1645,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1736,7 +1738,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1821,7 +1823,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1902,11 +1904,11 @@ public class WindowOperatorTest extends TestLogger { BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), windowStateDesc, new InternalIterableWindowFunction<>(new PassThroughFunction2()), - new EventTimeTriggerAccumGC(LATENESS), + new EventTimeTriggerAccumGC(LATENESS), LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -1929,7 +1931,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.close(); } - private class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> { + private static class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -1960,7 +1962,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2006,7 +2008,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2063,7 +2065,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2108,7 +2110,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2152,7 +2154,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2172,6 +2174,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.close(); } + // TODO this test seems invalid, as it uses the unsupported combination of merging windows and folding window state @Test public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception { final int GAP_SIZE = 3; @@ -2206,7 +2209,7 @@ public class WindowOperatorTest extends TestLogger { LATENESS); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; + new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -2230,7 +2233,7 @@ public class WindowOperatorTest extends TestLogger { // UDFs // ------------------------------------------------------------------------ - private class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> { + private static class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> { private static final long serialVersionUID = 1L; @Override @@ -2289,7 +2292,7 @@ public class WindowOperatorTest extends TestLogger { } @SuppressWarnings("unchecked") - private static class Tuple2ResultSortComparator implements Comparator<Object> { + private static class Tuple2ResultSortComparator implements Comparator<Object>, Serializable { @Override public int compare(Object o1, Object o2) { if (o1 instanceof Watermark || o2 instanceof Watermark) { @@ -2311,7 +2314,7 @@ public class WindowOperatorTest extends TestLogger { } @SuppressWarnings("unchecked") - private static class Tuple3ResultSortComparator implements Comparator<Object> { + private static class Tuple3ResultSortComparator implements Comparator<Object>, Serializable { @Override public int compare(Object o1, Object o2) { if (o1 instanceof Watermark || o2 instanceof Watermark) { @@ -2403,15 +2406,11 @@ public class WindowOperatorTest extends TestLogger { * purge the state of the fired window. This is to test the state * garbage collection mechanism. */ - public class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> { + public static class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private long cleanupTime; - private EventTimeTriggerAccumGC() { - cleanupTime = 0L; - } - public EventTimeTriggerAccumGC(long cleanupTime) { this.cleanupTime = cleanupTime; } http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java index 0e1aca0..0562443 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java @@ -18,9 +18,7 @@ package org.apache.flink.test.query; - import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -30,10 +28,14 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; + import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; @@ -59,14 +61,18 @@ public final class KVStateRequestSerializerRocksDBTest { */ final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> { - RocksDBKeyedStateBackend2(final JobID jobId, - final String operatorIdentifier, - final ClassLoader userCodeClassLoader, - final File instanceBasePath, final DBOptions dbOptions, - final ColumnFamilyOptions columnFamilyOptions, - final TaskKvStateRegistry kvStateRegistry, - final TypeSerializer<K> keySerializer, final int numberOfKeyGroups, - final KeyGroupRange keyGroupRange) throws Exception { + RocksDBKeyedStateBackend2( + final JobID jobId, + final String operatorIdentifier, + final ClassLoader userCodeClassLoader, + final File instanceBasePath, + final DBOptions dbOptions, + final ColumnFamilyOptions columnFamilyOptions, + final TaskKvStateRegistry kvStateRegistry, + final TypeSerializer<K> keySerializer, + final int numberOfKeyGroups, + final KeyGroupRange keyGroupRange) throws Exception { + super(jobId, operatorIdentifier, userCodeClassLoader, instanceBasePath, dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, @@ -74,9 +80,10 @@ public final class KVStateRequestSerializerRocksDBTest { } @Override - public <N, T> ListState<T> createListState( + public <N, T> InternalListState<N, T> createListState( final TypeSerializer<N> namespaceSerializer, final ListStateDescriptor<T> stateDesc) throws Exception { + return super.createListState(namespaceSerializer, stateDesc); } } @@ -90,8 +97,7 @@ public final class KVStateRequestSerializerRocksDBTest { */ @Test public void testListSerialization() throws Exception { - final long key = 0l; - TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE; + final long key = 0L; // objects for RocksDB state list serialisation DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); @@ -110,9 +116,10 @@ public final class KVStateRequestSerializerRocksDBTest { ); longHeapKeyedStateBackend.setCurrentKey(key); - final ListState<Long> listState = longHeapKeyedStateBackend + final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend .createListState(VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); + KvStateRequestSerializerTest.testListSerialization(key, listState); } }