http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java index 9f19064..d7df479 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java @@ -29,7 +29,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType; import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; @@ -84,7 +83,7 @@ public class WindowOperatorMigrationTest { @Parameterized.Parameters(name = "Migration Savepoint: {0}") public static Collection<MigrationVersion> parameters () { - return Arrays.asList(MigrationVersion.v1_1, MigrationVersion.v1_2, MigrationVersion.v1_3); + return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3); } /** @@ -753,219 +752,6 @@ public class WindowOperatorMigrationTest { testHarness.close(); } - /** - * Manually run this to write binary snapshot data. - */ - @Ignore - @Test - public void writeAggregatingAlignedProcessingTimeWindowsSnapshot() throws Exception { - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - AggregatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>> operator = - new AggregatingProcessingTimeWindowOperator<>( - new ReduceFunction<Tuple2<String, Integer>>() { - private static final long serialVersionUID = -8913160567151867987L; - - @Override - public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { - return new Tuple2<>(value1.f0, value1.f1 + value2.f1); - } - }, - new TupleKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - inputType.createSerializer(new ExecutionConfig()), - 3000, - 3000); - - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.open(); - - testHarness.setProcessingTime(3); - - // timestamp is ignored in processing time - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); - - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - - // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); - OperatorSnapshotUtil.writeStateHandle( - snapshot, - "src/test/resources/win-op-migration-test-aggr-aligned-flink" + flinkGenerateSavepointVersion + "-snapshot"); - testHarness.close(); - } - - @Test - public void testRestoreAggregatingAlignedProcessingTimeWindows() throws Exception { - final int windowSize = 3; - - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", - new SumReducer(), - inputType.createSerializer(new ExecutionConfig())); - - WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), - new TimeWindow.Serializer(), - new TupleKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - stateDesc, - new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), - ProcessingTimeTrigger.create(), - 0, - null /* late data output tag */, - LegacyWindowOperatorType.FAST_AGGREGATING); - - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - testHarness.setup(); - - MigrationTestUtil.restoreFromSnapshot( - testHarness, - OperatorSnapshotUtil.getResourceFilename( - "win-op-migration-test-aggr-aligned-flink" + testMigrateVersion + "-snapshot"), - testMigrateVersion); - - testHarness.open(); - - testHarness.setProcessingTime(5000); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - - testHarness.setProcessingTime(7000); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - - testHarness.close(); - } - - /** - * Manually run this to write binary snapshot data. - */ - @Ignore - @Test - public void writeAlignedProcessingTimeWindowsSnapshot() throws Exception { - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - AccumulatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>> operator = - new AccumulatingProcessingTimeWindowOperator<>( - new InternalIterableWindowFunction<>(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { - - private static final long serialVersionUID = 6551516443265733803L; - - @Override - public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { - int sum = 0; - for (Tuple2<String, Integer> anInput : input) { - sum += anInput.f1; - } - out.collect(new Tuple2<>(s, sum)); - } - }), - new TupleKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - inputType.createSerializer(new ExecutionConfig()), - 3000, - 3000); - - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.open(); - - testHarness.setProcessingTime(3); - - // timestamp is ignored in processing time - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000)); - - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - - // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); - OperatorSnapshotUtil.writeStateHandle( - snapshot, - "src/test/resources/win-op-migration-test-accum-aligned-flink" + flinkGenerateSavepointVersion + "-snapshot"); - testHarness.close(); - } - - @Test - public void testRestoreAccumulatingAlignedProcessingTimeWindows() throws Exception { - final int windowSize = 3; - - TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - - ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", - new SumReducer(), - inputType.createSerializer(new ExecutionConfig())); - - WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)), - new TimeWindow.Serializer(), - new TupleKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - stateDesc, - new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), - ProcessingTimeTrigger.create(), - 0, - null /* late data output tag */, - LegacyWindowOperatorType.FAST_ACCUMULATING); - - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - testHarness.setup(); - - MigrationTestUtil.restoreFromSnapshot( - testHarness, - OperatorSnapshotUtil.getResourceFilename( - "win-op-migration-test-accum-aligned-flink" + testMigrateVersion + "-snapshot"), - testMigrateVersion); - - testHarness.open(); - - testHarness.setProcessingTime(5000); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999)); - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000)); - - testHarness.setProcessingTime(7000); - - expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator()); - - testHarness.close(); - } - private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 8748ed4..821438e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -63,7 +63,6 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.TimeUnit; @@ -336,32 +335,6 @@ public class WindowTranslationTest { processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); } - - /** - * Ignored because we currently don't have the fast processing-time window operator. - */ - @Test - @SuppressWarnings("rawtypes") - @Ignore - public void testReduceFastProcessingTime() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); - - DataStream<Tuple2<String, Integer>> window = source - .keyBy(new TupleKeySelector()) - .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .reduce(new DummyReducer()); - - OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = - (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation(); - OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator(); - Assert.assertTrue(operator instanceof AggregatingProcessingTimeWindowOperator); - - processElementAndEnsureOutput(operator, null, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1)); - } - @Test @SuppressWarnings("rawtypes") public void testReduceWithWindowFunctionEventTime() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 14ae733..f73499c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -64,7 +64,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -75,7 +74,6 @@ import org.junit.Test; import java.io.EOFException; import java.io.IOException; -import java.io.Serializable; import java.net.URL; import java.util.Collection; import java.util.Collections; @@ -106,12 +104,6 @@ public class InterruptSensitiveRestoreTest { private static final int OPERATOR_RAW = 1; private static final int KEYED_MANAGED = 2; private static final int KEYED_RAW = 3; - private static final int LEGACY = 4; - - @Test - public void testRestoreWithInterruptLegacy() throws Exception { - testRestoreWithInterrupt(LEGACY); - } @Test public void testRestoreWithInterruptOperatorManaged() throws Exception { @@ -137,18 +129,15 @@ public class InterruptSensitiveRestoreTest { IN_RESTORE_LATCH.reset(); Configuration taskConfig = new Configuration(); - StreamConfig streamConfig = new StreamConfig(taskConfig); - streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + StreamConfig cfg = new StreamConfig(taskConfig); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); switch (mode) { case OPERATOR_MANAGED: case OPERATOR_RAW: case KEYED_MANAGED: case KEYED_RAW: - streamConfig.setStateKeySerializer(IntSerializer.INSTANCE); - streamConfig.setStreamOperator(new StreamSource<>(new TestSource())); - break; - case LEGACY: - streamConfig.setStreamOperator(new StreamSource<>(new TestSourceLegacy())); + cfg.setStateKeySerializer(IntSerializer.INSTANCE); + cfg.setStreamOperator(new StreamSource<>(new TestSource(mode))); break; default: throw new IllegalArgumentException(); @@ -156,7 +145,7 @@ public class InterruptSensitiveRestoreTest { StreamStateHandle lockingHandle = new InterruptLockingStateHandle(); - Task task = createTask(streamConfig, taskConfig, lockingHandle, mode); + Task task = createTask(cfg, taskConfig, lockingHandle, mode); // start the task and wait until it is in "restore" task.startTaskThread(); @@ -180,16 +169,15 @@ public class InterruptSensitiveRestoreTest { // ------------------------------------------------------------------------ private static Task createTask( - StreamConfig streamConfig, - Configuration taskConfig, - StreamStateHandle state, - int mode) throws IOException { + StreamConfig streamConfig, + Configuration taskConfig, + StreamStateHandle state, + int mode) throws IOException { NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - StreamStateHandle operatorState = null; Collection<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList(); Collection<KeyedStateHandle> keyedStateFromStream = Collections.emptyList(); Collection<OperatorStateHandle> operatorStateBackend = Collections.emptyList(); @@ -206,7 +194,7 @@ public class InterruptSensitiveRestoreTest { Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state)); List<KeyedStateHandle> keyedStateHandles = - Collections.<KeyedStateHandle>singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state)); + Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state)); switch (mode) { case OPERATOR_MANAGED: @@ -221,15 +209,11 @@ public class InterruptSensitiveRestoreTest { case KEYED_RAW: keyedStateFromStream = keyedStateHandles; break; - case LEGACY: - operatorState = state; - break; default: throw new IllegalArgumentException(); } OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - operatorState, operatorStateBackend, operatorStateStream, keyedStateFromBackend, @@ -238,14 +222,13 @@ public class InterruptSensitiveRestoreTest { JobVertexID jobVertexID = new JobVertexID(); OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID); streamConfig.setOperatorID(operatorID); - TaskStateSnapshot stateSnapshot = new TaskStateSnapshot(); stateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); JobInformation jobInformation = new JobInformation( new JobID(), "test job name", new SerializedValue<>(new ExecutionConfig()), - taskConfig, + new Configuration(), Collections.<BlobKey>emptyList(), Collections.<URL>emptyList()); @@ -302,11 +285,11 @@ public class InterruptSensitiveRestoreTest { FSDataInputStream is = new FSDataInputStream() { @Override - public void seek(long desired) throws IOException { + public void seek(long desired) { } @Override - public long getPos() throws IOException { + public long getPos() { return 0; } @@ -358,33 +341,15 @@ public class InterruptSensitiveRestoreTest { // ------------------------------------------------------------------------ - private static class TestSourceLegacy implements SourceFunction<Object>, Checkpointed<Serializable> { + private static class TestSource implements SourceFunction<Object>, CheckpointedFunction { private static final long serialVersionUID = 1L; + private final int testType; - @Override - public void run(SourceContext<Object> ctx) throws Exception { - fail("should never be called"); + public TestSource(int testType) { + this.testType = testType; } @Override - public void cancel() {} - - @Override - public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - fail("should never be called"); - return null; - } - - @Override - public void restoreState(Serializable state) throws Exception { - fail("should never be called"); - } - } - - private static class TestSource implements SourceFunction<Object>, CheckpointedFunction { - private static final long serialVersionUID = 1L; - - @Override public void run(SourceContext<Object> ctx) throws Exception { fail("should never be called"); } @@ -399,6 +364,8 @@ public class InterruptSensitiveRestoreTest { @Override public void initializeState(FunctionInitializationContext context) throws Exception { + // raw keyed state is already read by timer service, all others to initialize the context...we only need to + // trigger this manually. ((StateInitializationContext) context).getRawOperatorStateInputs().iterator().next().getStream().read(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 3190620..8d80d66 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -28,8 +28,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; @@ -47,21 +45,18 @@ import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.util.TestHarnessUtil; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -69,7 +64,6 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -78,7 +72,6 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -93,7 +86,7 @@ import static org.junit.Assert.fail; public class OneInputStreamTaskTest extends TestLogger { private static final ListStateDescriptor<Integer> TEST_DESCRIPTOR = - new ListStateDescriptor<>("test", new IntSerializer()); + new ListStateDescriptor<>("test", new IntSerializer()); /** * This test verifies that open() and close() are correctly called. This test also verifies @@ -129,8 +122,8 @@ public class OneInputStreamTaskTest extends TestLogger { assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled); TestHarnessUtil.assertOutputEquals("Output was not correct.", - expectedOutput, - testHarness.getOutput()); + expectedOutput, + testHarness.getOutput()); } /** @@ -174,8 +167,8 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.waitForInputProcessing(); expectedOutput.add(new Watermark(initialTime)); TestHarnessUtil.assertOutputEquals("Output was not correct.", - expectedOutput, - testHarness.getOutput()); + expectedOutput, + testHarness.getOutput()); // contrary to checkpoint barriers these elements are not blocked by watermarks testHarness.processElement(new StreamRecord<String>("Hello", initialTime)); @@ -215,7 +208,7 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.processElement(new Watermark(initialTime + 6), 0, 0); testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged, - // watermark (initial + 6) should be forwarded + // watermark (initial + 6) should be forwarded testHarness.waitForInputProcessing(); expectedOutput.add(new Watermark(initialTime + 5)); expectedOutput.add(new Watermark(initialTime + 6)); @@ -263,21 +256,16 @@ public class OneInputStreamTaskTest extends TestLogger { // ------------------ setup the chain ------------------ TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator(); - OperatorID headOperatorId = new OperatorID(); - StreamConfig headOperatorConfig = testHarness.getStreamConfig(); WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator(); - OperatorID watermarkOperatorId = new OperatorID(); - StreamConfig watermarkOperatorConfig = new StreamConfig(new Configuration()); TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator(); - OperatorID tailOperatorId = new OperatorID(); StreamConfig tailOperatorConfig = new StreamConfig(new Configuration()); headOperatorConfig.setStreamOperator(headOperator); - headOperatorConfig.setOperatorID(headOperatorId); + headOperatorConfig.setOperatorID(new OperatorID(42L, 42L)); headOperatorConfig.setChainStart(); headOperatorConfig.setChainIndex(0); headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge( @@ -290,7 +278,7 @@ public class OneInputStreamTaskTest extends TestLogger { ))); watermarkOperatorConfig.setStreamOperator(watermarkOperator); - watermarkOperatorConfig.setOperatorID(watermarkOperatorId); + watermarkOperatorConfig.setOperatorID(new OperatorID(4711L, 42L)); watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE); watermarkOperatorConfig.setChainIndex(1); watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge( @@ -312,7 +300,7 @@ public class OneInputStreamTaskTest extends TestLogger { null)); tailOperatorConfig.setStreamOperator(tailOperator); - tailOperatorConfig.setOperatorID(tailOperatorId); + tailOperatorConfig.setOperatorID(new OperatorID(123L, 123L)); tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE); tailOperatorConfig.setBufferTimeout(0); tailOperatorConfig.setChainIndex(2); @@ -555,13 +543,11 @@ public class OneInputStreamTaskTest extends TestLogger { long checkpointId = 1L; long checkpointTimestamp = 1L; - long recoveryTimestamp = 3L; - long seed = 2L; int numberChainedTasks = 11; StreamConfig streamConfig = testHarness.getStreamConfig(); - configureChainedTestingStreamOperator(streamConfig, numberChainedTasks, seed, recoveryTimestamp); + configureChainedTestingStreamOperator(streamConfig, numberChainedTasks); AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment( testHarness.jobConfig, @@ -599,7 +585,7 @@ public class OneInputStreamTaskTest extends TestLogger { StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig(); - configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks, seed, recoveryTimestamp); + configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks); TaskStateSnapshot stateHandles = env.getCheckpointStateHandles(); Assert.assertEquals(numberChainedTasks, stateHandles.getSubtaskStateMappings().size()); @@ -625,16 +611,12 @@ public class OneInputStreamTaskTest extends TestLogger { private void configureChainedTestingStreamOperator( StreamConfig streamConfig, - int numberChainedTasks, - long seed, - long recoveryTimestamp) { + int numberChainedTasks) { Preconditions.checkArgument(numberChainedTasks >= 1, "The operator chain must at least " + "contain one operator."); - Random random = new Random(seed); - - TestingStreamOperator<Integer, Integer> previousOperator = new TestingStreamOperator<>(random.nextLong(), recoveryTimestamp); + TestingStreamOperator<Integer, Integer> previousOperator = new TestingStreamOperator<>(); streamConfig.setStreamOperator(previousOperator); streamConfig.setOperatorID(new OperatorID(0L, 0L)); @@ -643,7 +625,7 @@ public class OneInputStreamTaskTest extends TestLogger { List<StreamEdge> outputEdges = new ArrayList<>(numberChainedTasks - 1); for (int chainedIndex = 1; chainedIndex < numberChainedTasks; chainedIndex++) { - TestingStreamOperator<Integer, Integer> chainedOperator = new TestingStreamOperator<>(random.nextLong(), recoveryTimestamp); + TestingStreamOperator<Integer, Integer> chainedOperator = new TestingStreamOperator<>(); StreamConfig chainedConfig = new StreamConfig(new Configuration()); chainedConfig.setStreamOperator(chainedOperator); chainedConfig.setOperatorID(new OperatorID(0L, chainedIndex)); @@ -702,17 +684,17 @@ public class OneInputStreamTaskTest extends TestLogger { } AcknowledgeStreamMockEnvironment( - Configuration jobConfig, Configuration taskConfig, - ExecutionConfig executionConfig, long memorySize, - MockInputSplitProvider inputSplitProvider, int bufferSize) { + Configuration jobConfig, Configuration taskConfig, + ExecutionConfig executionConfig, long memorySize, + MockInputSplitProvider inputSplitProvider, int bufferSize) { super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize); } @Override public void acknowledgeCheckpoint( - long checkpointId, - CheckpointMetrics checkpointMetrics, - TaskStateSnapshot checkpointStateHandles) { + long checkpointId, + CheckpointMetrics checkpointMetrics, + TaskStateSnapshot checkpointStateHandles) { this.checkpointId = checkpointId; this.checkpointStateHandles = checkpointStateHandles; @@ -729,19 +711,14 @@ public class OneInputStreamTaskTest extends TestLogger { } private static class TestingStreamOperator<IN, OUT> - extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator { + extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 774614855940397174L; public static int numberRestoreCalls = 0; public static int numberSnapshotCalls = 0; - private final long seed; - private final long recoveryTimestamp; - - private transient Random random; - @Override public void open() throws Exception { super.open(); @@ -767,7 +744,7 @@ public class OneInputStreamTaskTest extends TestLogger { @Override public void snapshotState(StateSnapshotContext context) throws Exception { ListState<Integer> partitionableState = - getOperatorStateBackend().getListState(TEST_DESCRIPTOR); + getOperatorStateBackend().getListState(TEST_DESCRIPTOR); partitionableState.clear(); partitionableState.add(42); @@ -778,59 +755,14 @@ public class OneInputStreamTaskTest extends TestLogger { @Override public void initializeState(StateInitializationContext context) throws Exception { - - } - - TestingStreamOperator(long seed, long recoveryTimestamp) { - this.seed = seed; - this.recoveryTimestamp = recoveryTimestamp; - } - - @Override - public void processElement(StreamRecord<IN> element) throws Exception { - - } - - @Override - public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - if (random == null) { - random = new Random(seed); + if (context.isRestored()) { + ++numberRestoreCalls; } - - Serializable functionState = generateFunctionState(); - Integer operatorState = generateOperatorState(); - - InstantiationUtil.serializeObject(out, functionState); - InstantiationUtil.serializeObject(out, operatorState); } @Override - public void restoreState(FSDataInputStream in) throws Exception { - numberRestoreCalls++; - - if (random == null) { - random = new Random(seed); - } - - assertEquals(this.recoveryTimestamp, recoveryTimestamp); - - assertNotNull(in); - - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - - Serializable functionState = InstantiationUtil.deserializeObject(in, cl); - Integer operatorState = InstantiationUtil.deserializeObject(in, cl); - - assertEquals(random.nextInt(), functionState); - assertEquals(random.nextInt(), (int) operatorState); - } - - private Serializable generateFunctionState() { - return random.nextInt(); - } + public void processElement(StreamRecord<IN> element) throws Exception { - private Integer generateOperatorState() { - return random.nextInt(); } } @@ -913,8 +845,8 @@ public class OneInputStreamTaskTest extends TestLogger { * <p>If it receives a watermark when it's not expecting one, it'll throw an exception and fail. */ private static class TriggerableFailOnWatermarkTestOperator - extends AbstractStreamOperator<String> - implements OneInputStreamOperator<String, String> { + extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { private static final long serialVersionUID = 2048954179291813243L; http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 08c3207..a2dc6c4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -81,7 +81,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -139,7 +138,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; import static org.powermock.api.mockito.PowerMockito.whenNew; /** @@ -307,9 +305,9 @@ public class StreamTaskTest extends TestLogger { streamTask.setEnvironment(mockEnvironment); // mock the operators - StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + StreamOperator<?> streamOperator1 = mock(StreamOperator.class); + StreamOperator<?> streamOperator2 = mock(StreamOperator.class); + StreamOperator<?> streamOperator3 = mock(StreamOperator.class); // mock the returned snapshots OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class); @@ -321,15 +319,6 @@ public class StreamTaskTest extends TestLogger { when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2); when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenThrow(testException); - // mock the returned legacy snapshots - StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class); - StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class); - StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class); - - when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1); - when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2); - when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3); - OperatorID operatorID1 = new OperatorID(); OperatorID operatorID2 = new OperatorID(); OperatorID operatorID3 = new OperatorID(); @@ -359,10 +348,6 @@ public class StreamTaskTest extends TestLogger { verify(operatorSnapshotResult1).cancel(); verify(operatorSnapshotResult2).cancel(); - - verify(streamStateHandle1).discardState(); - verify(streamStateHandle2).discardState(); - verify(streamStateHandle3).discardState(); } /** @@ -384,12 +369,12 @@ public class StreamTaskTest extends TestLogger { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); streamTask.setEnvironment(mockEnvironment); - StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - - // mock the new state handles / futures + // mock the operators + StreamOperator<?> streamOperator1 = mock(StreamOperator.class); + StreamOperator<?> streamOperator2 = mock(StreamOperator.class); + StreamOperator<?> streamOperator3 = mock(StreamOperator.class); + // mock the new state operator snapshots OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class); OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class); OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class); @@ -403,15 +388,6 @@ public class StreamTaskTest extends TestLogger { when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2); when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3); - // mock the legacy state snapshot - StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class); - StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class); - StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class); - - when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1); - when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2); - when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3); - OperatorID operatorID1 = new OperatorID(); OperatorID operatorID2 = new OperatorID(); OperatorID operatorID3 = new OperatorID(); @@ -438,10 +414,6 @@ public class StreamTaskTest extends TestLogger { verify(operatorSnapshotResult1).cancel(); verify(operatorSnapshotResult2).cancel(); verify(operatorSnapshotResult3).cancel(); - - verify(streamStateHandle1).discardState(); - verify(streamStateHandle2).discardState(); - verify(streamStateHandle3).discardState(); } /** @@ -481,7 +453,7 @@ public class StreamTaskTest extends TestLogger { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); streamTask.setEnvironment(mockEnvironment); - StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + StreamOperator<?> streamOperator = mock(StreamOperator.class); KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class); @@ -581,7 +553,6 @@ public class StreamTaskTest extends TestLogger { whenNew(OperatorSubtaskState.class). withArguments( - any(StreamStateHandle.class), anyCollectionOf(OperatorStateHandle.class), anyCollectionOf(OperatorStateHandle.class), anyCollectionOf(KeyedStateHandle.class), @@ -593,11 +564,10 @@ public class StreamTaskTest extends TestLogger { completeSubtask.await(); Object[] arguments = invocation.getArguments(); return new OperatorSubtaskState( - (StreamStateHandle) arguments[0], + (OperatorStateHandle) arguments[0], (OperatorStateHandle) arguments[1], - (OperatorStateHandle) arguments[2], - (KeyedStateHandle) arguments[3], - (KeyedStateHandle) arguments[4] + (KeyedStateHandle) arguments[2], + (KeyedStateHandle) arguments[3] ); } }); @@ -606,7 +576,7 @@ public class StreamTaskTest extends TestLogger { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); streamTask.setEnvironment(mockEnvironment); - final StreamOperator<?> streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + final StreamOperator<?> streamOperator = mock(StreamOperator.class); final OperatorID operatorID = new OperatorID(); when(streamOperator.getOperatorID()).thenReturn(operatorID); @@ -717,7 +687,7 @@ public class StreamTaskTest extends TestLogger { // mock the operators StreamOperator<?> statelessOperator = - mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + mock(StreamOperator.class); final OperatorID operatorID = new OperatorID(); when(statelessOperator.getOperatorID()).thenReturn(operatorID); http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 720346a..9156f34 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -25,11 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer; -import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState; -import org.apache.flink.migration.util.MigrationInstantiationUtil; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; @@ -41,12 +36,10 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -54,7 +47,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest; import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; @@ -72,7 +64,6 @@ import org.apache.flink.util.Preconditions; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.FileInputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -308,36 +299,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { setupCalled = true; } - public void initializeStateFromLegacyCheckpoint(String checkpointFilename) throws Exception { - - FileInputStream fin = new FileInputStream(checkpointFilename); - StreamTaskState state = MigrationInstantiationUtil.deserializeObject(fin, ClassLoader.getSystemClassLoader()); - fin.close(); - - if (!setupCalled) { - setup(); - } - - StreamStateHandle stateHandle = SavepointV0Serializer.convertOperatorAndFunctionState(state); - - List<KeyedStateHandle> keyGroupStatesList = new ArrayList<>(); - if (state.getKvStates() != null) { - KeyGroupsStateHandle keyedStateHandle = SavepointV0Serializer.convertKeyedBackendState( - state.getKvStates(), - environment.getTaskInfo().getIndexOfThisSubtask(), - 0); - keyGroupStatesList.add(keyedStateHandle); - } - - // finally calling the initializeState() with the legacy operatorStateHandles - initializeState(new OperatorStateHandles(0, - stateHandle, - keyGroupStatesList, - Collections.<KeyedStateHandle>emptyList(), - Collections.<OperatorStateHandle>emptyList(), - Collections.<OperatorStateHandle>emptyList())); - } - /** * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorSubtaskState)}. * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} @@ -397,7 +358,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { numSubtasks).get(subtaskIndex); OperatorSubtaskState massagedOperatorStateHandles = new OperatorSubtaskState( - operatorStateHandles.getLegacyOperatorState(), nullToEmptyCollection(localManagedOperatorState), nullToEmptyCollection(localRawOperatorState), nullToEmptyCollection(localManagedKeyGroupState), @@ -473,7 +433,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { return new OperatorStateHandles( 0, - null, mergedManagedKeyedState, mergedRawKeyedState, mergedManagedOperatorState, @@ -497,8 +456,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { */ public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception { - CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(new JobID(), "test_op"); - OperatorSnapshotResult operatorStateResult = operator.snapshotState( checkpointId, timestamp, @@ -510,21 +467,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture()); OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture()); - // also snapshot legacy state, if any - StreamStateHandle legacyStateHandle = null; - - if (operator instanceof StreamCheckpointedOperator) { - - final CheckpointStreamFactory.CheckpointStateOutputStream outStream = - streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); - - ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); - legacyStateHandle = outStream.closeAndGetHandle(); - } - return new OperatorStateHandles( 0, - legacyStateHandle, keyedManaged != null ? Collections.singletonList(keyedManaged) : null, keyedRaw != null ? Collections.singletonList(keyedRaw) : null, opManaged != null ? Collections.singletonList(opManaged) : null, @@ -532,24 +476,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { } /** - * Calls {@link StreamCheckpointedOperator#snapshotState(FSDataOutputStream, long, long)} if - * the operator implements this interface. - */ - @Deprecated - public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception { - - CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( - new JobID(), - "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); - if (operator instanceof StreamCheckpointedOperator) { - ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); - return outStream.closeAndGetHandle(); - } else { - throw new RuntimeException("Operator is not StreamCheckpointedOperator"); - } - } - - /** * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}. */ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { @@ -557,22 +483,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { } /** - * Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if - * the operator implements this interface. - */ - @Deprecated - @SuppressWarnings("deprecation") - public void restore(StreamStateHandle snapshot) throws Exception { - if (operator instanceof StreamCheckpointedOperator) { - try (FSDataInputStream in = snapshot.openInputStream()) { - ((StreamCheckpointedOperator) operator).restoreState(in); - } - } else { - throw new RuntimeException("Operator is not StreamCheckpointedOperator"); - } - } - - /** * Calls close and dispose on the operator. */ public void close() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 0d42d9f..c2ec63a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -23,33 +23,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.util.Migration; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.concurrent.RunnableFuture; import static org.mockito.Matchers.any; import static org.mockito.Mockito.anyInt; @@ -142,61 +132,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } } - /** - * - */ - @Override - public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception { - // simply use an in-memory handle - MemoryStateBackend backend = new MemoryStateBackend(); - - CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op"); - CheckpointStreamFactory.CheckpointStateOutputStream outStream = - streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); - - if (operator instanceof StreamCheckpointedOperator) { - ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); - } - - if (keyedStateBackend != null) { - RunnableFuture<KeyedStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot( - checkpointId, - timestamp, - streamFactory, - CheckpointOptions.forFullCheckpoint()); - if (!keyedSnapshotRunnable.isDone()) { - Thread runner = new Thread(keyedSnapshotRunnable); - runner.start(); - } - outStream.write(1); - ObjectOutputStream oos = new ObjectOutputStream(outStream); - oos.writeObject(keyedSnapshotRunnable.get()); - oos.flush(); - } else { - outStream.write(0); - } - return outStream.closeAndGetHandle(); - } - - /** - * - */ - @Override - public void restore(StreamStateHandle snapshot) throws Exception { - try (FSDataInputStream inStream = snapshot.openInputStream()) { - - if (operator instanceof StreamCheckpointedOperator) { - ((StreamCheckpointedOperator) operator).restoreState(inStream); - } - - byte keyedStatePresent = (byte) inStream.read(); - if (keyedStatePresent == 1) { - ObjectInputStream ois = new ObjectInputStream(inStream); - this.restoredKeyedState = Collections.singletonList((KeyedStateHandle) ois.readObject()); - } - } - } - private static boolean hasMigrationHandles(Collection<KeyedStateHandle> allKeyGroupsHandles) { for (KeyedStateHandle handle : allKeyGroupsHandles) { if (handle instanceof Migration) { http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java index 7e32723..33f32e9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.util; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import java.io.DataInputStream; @@ -53,7 +52,8 @@ public class OperatorSnapshotUtil { dos.writeInt(state.getOperatorChainIndex()); - SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); + // still required for compatibility + SavepointV1Serializer.serializeStreamStateHandle(null, dos); Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState(); if (rawOperatorState != null) { @@ -108,7 +108,8 @@ public class OperatorSnapshotUtil { try (DataInputStream dis = new DataInputStream(in)) { int index = dis.readInt(); - StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis); + // still required for compatibility to consume the bytes. + SavepointV1Serializer.deserializeStreamStateHandle(dis); List<OperatorStateHandle> rawOperatorState = null; int numRawOperatorStates = dis.readInt(); @@ -154,7 +155,12 @@ public class OperatorSnapshotUtil { } } - return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState); + return new OperatorStateHandles( + index, + managedKeyedState, + rawKeyedState, + managedOperatorState, + rawOperatorState); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java index f723b34..1c95a04 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java @@ -29,22 +29,16 @@ public class MigrationTestUtil { /** * Restore from a snapshot taken with an older Flink version. * - * @param testHarness the test harness to restore the snapshot to. - * @param snapshotPath the absolute path to the snapshot. + * @param testHarness the test harness to restore the snapshot to. + * @param snapshotPath the absolute path to the snapshot. * @param snapshotFlinkVersion the Flink version of the snapshot. - * * @throws Exception */ public static void restoreFromSnapshot( - AbstractStreamOperatorTestHarness<?> testHarness, - String snapshotPath, - MigrationVersion snapshotFlinkVersion) throws Exception { + AbstractStreamOperatorTestHarness<?> testHarness, + String snapshotPath, + MigrationVersion snapshotFlinkVersion) throws Exception { - if (snapshotFlinkVersion == MigrationVersion.v1_1) { - // Flink 1.1 snapshots should be read using the legacy restore method - testHarness.initializeStateFromLegacyCheckpoint(snapshotPath); - } else { - testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath)); - } + testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala index 104400f..35a56d7 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala @@ -25,15 +25,15 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.streaming.api.windowing.assigners.{SlidingAlignedProcessingTimeWindows, SlidingEventTimeWindows, TumblingAlignedProcessingTimeWindows} +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.streaming.runtime.operators.windowing.{AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator, WindowOperator} +import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.util.Collector import org.junit.Assert._ -import org.junit.{Ignore, Test} +import org.junit.Test /** * These tests verify that the api calls on [[WindowedStream]] that use the "time" shortcut @@ -85,59 +85,6 @@ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) } - /** - * These tests ensure that the fast aligned time windows operator is used if the - * conditions are right. - */ - @Test - def testReduceAlignedTimeWindows(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - - val source = env.fromElements(("hello", 1), ("hello", 2)) - - val window1 = source - .keyBy(0) - .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))) - .reduce(new DummyReducer()) - - val transform1 = window1.javaStream.getTransformation - .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - - val operator1 = transform1.getOperator - - assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]]) - } - - /** - * These tests ensure that the fast aligned time windows operator is used if the - * conditions are right. - */ - @Test - def testApplyAlignedTimeWindows(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - - val source = env.fromElements(("hello", 1), ("hello", 2)) - - val window1 = source - .keyBy(0) - .window(TumblingAlignedProcessingTimeWindows.of(Time.minutes(1))) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { - def apply( - key: Tuple, - window: TimeWindow, - values: Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) - - val transform1 = window1.javaStream.getTransformation - .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] - - val operator1 = transform1.getOperator - - assertTrue(operator1.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]]) - } - @Test def testReduceEventTimeWindows(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index cad6693..99fb6ef 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -44,7 +44,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -975,7 +974,7 @@ public class RescalingITCase extends TestLogger { } } - private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction, CheckpointedRestoring<Integer> { + private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction { private static final long serialVersionUID = -359715965103593462L; private static final int NUM_PARTITIONS = 7; @@ -1030,10 +1029,5 @@ public class RescalingITCase extends TestLogger { checkCorrectRestore[getRuntimeContext().getIndexOfThisSubtask()] = counter; } } - - @Override - public void restoreState(Integer state) throws Exception { - counterPartitions.add(state); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index cc23545..1b7dafa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -53,7 +53,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; -import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; @@ -351,10 +351,6 @@ public class SavepointITCase extends TestLogger { OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex()); assertNotNull(subtaskState); - - errMsg = "Initial operator state mismatch."; - assertEquals(errMsg, subtaskState.getLegacyOperatorState(), - tdd.getTaskStateHandles().getSubtaskStateByOperatorID(operatorState.getOperatorID()).getLegacyOperatorState()); } } @@ -377,17 +373,18 @@ public class SavepointITCase extends TestLogger { assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass()); // - Verification START ------------------------------------------- - // The checkpoint files List<File> checkpointFiles = new ArrayList<>(); for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) { for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) { - StreamStateHandle streamTaskState = subtaskState.getLegacyOperatorState(); + Collection<OperatorStateHandle> streamTaskState = subtaskState.getManagedOperatorState(); - if (streamTaskState != null) { - FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState; - checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri())); + if (streamTaskState != null && !streamTaskState.isEmpty()) { + for (OperatorStateHandle osh : streamTaskState) { + FileStateHandle fileStateHandle = (FileStateHandle) osh.getDelegateStateHandle(); + checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri())); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index 21be7ba..eccc7e9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -25,6 +25,7 @@ import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; import org.apache.flink.runtime.client.JobListeningContext; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -39,6 +40,7 @@ import org.apache.flink.test.util.TestBaseUtils; import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -64,6 +66,11 @@ import static junit.framework.Assert.fail; */ public class SavepointMigrationTestBase extends TestBaseUtils { + @BeforeClass + public static void before() { + SavepointSerializers.setFailWhenLegacyStateDetected(false); + } + @Rule public TemporaryFolder tempFolder = new TemporaryFolder();