Repository: flink Updated Branches: refs/heads/master 2d191ab05 -> a2eb6cc87
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java new file mode 100644 index 0000000..d3fde9e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.timestamp; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.fail; + +/** + * Tests for timestamps, watermarks, and event-time sources. + */ +@SuppressWarnings("serial") +public class TimestampITCase { + + private static final int NUM_TASK_MANAGERS = 2; + private static final int NUM_TASK_SLOTS = 3; + private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + + private static ForkableFlinkMiniCluster cluster; + + @BeforeClass + public static void startCluster() { + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS); + config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms"); + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + + cluster = new ForkableFlinkMiniCluster(config, false); + } + catch (Exception e) { + e.printStackTrace(); + fail("Failed to start test cluster: " + e.getMessage()); + } + } + + @AfterClass + public static void shutdownCluster() { + try { + cluster.shutdown(); + cluster = null; + } + catch (Exception e) { + e.printStackTrace(); + fail("Failed to stop test cluster: " + e.getMessage()); + } + } + + /** + * These check whether custom timestamp emission works at sources and also whether timestamps + * arrive at operators throughout a topology. + * + * <p> + * This only uses map to test the workings of watermarks in a complete, running topology. All + * tasks and stream operators have dedicated tests that test the watermark propagation + * behaviour. + */ + @Test + public void testWatermarkPropagation() throws Exception { + final int NUM_WATERMARKS = 10; + + long initialTime = 0L; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + env.getConfig().enableTimestamps(); + + + DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS)); + DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS)); + + source1 + .map(new IdentityMap()) + .connect(source2).map(new IdentityCoMap()) + .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator()); + + env.execute(); + + // verify that all the watermarks arrived at the final custom operator + for (int i = 0; i < PARALLELISM; i++) { + for (int j = 0; j < NUM_WATERMARKS; j++) { + if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) { + Assert.fail("Wrong watermark."); + } + } + } + } + + + + /** + * These check whether timestamps are properly assigned at the sources and handled in + * network transmission and between chained operators when timestamps are enabled. + */ + @Test + public void testTimestampHandling() throws Exception { + final int NUM_ELEMENTS = 10; + + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + env.getConfig().enableTimestamps(); + + + DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS)); + DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS)); + + source1 + .map(new IdentityMap()) + .connect(source2).map(new IdentityCoMap()) + .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); + + env.execute(); + } + + /** + * These check whether timestamps are properly ignored when they are disabled. + */ + @Test + public void testDisabledTimestamps() throws Exception { + final int NUM_ELEMENTS = 10; + + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + Assert.assertEquals("Timestamps are not disabled by default.", false, env.getConfig().areTimestampsEnabled()); + env.getConfig().disableTimestamps(); + + + DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS)); + DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS)); + + source1 + .map(new IdentityMap()) + .connect(source2).map(new IdentityCoMap()) + .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator()); + + env.execute(); + } + + /** + * This tests whether the program throws an exception when an event-time source tries + * to emit without timestamp. + */ + @Test(expected = ProgramInvocationException.class) + public void testEventTimeSourceEmitWithoutTimestamp() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + DataStream<Integer> source1 = env.addSource(new MyErroneousTimestampSource()); + + source1 + .map(new IdentityMap()); + + env.execute(); + } + + /** + * This tests whether the program throws an exception when a regular source tries + * to emit with timestamp. + */ + @Test(expected = ProgramInvocationException.class) + public void testSourceEmitWithTimestamp() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + DataStream<Integer> source1 = env.addSource(new MyErroneousSource()); + + source1 + .map(new IdentityMap()); + + env.execute(); + } + + /** + * This tests whether the program throws an exception when a regular source tries + * to emit a watermark. + */ + @Test(expected = ProgramInvocationException.class) + public void testSourceEmitWatermark() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + + DataStream<Integer> source1 = env.addSource(new MyErroneousWatermarkSource()); + + source1 + .map(new IdentityMap()); + + env.execute(); + } + + public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { + + List<Watermark> watermarks; + public static List<Watermark>[] finalWatermarks = new List[PARALLELISM]; + private long oldTimestamp; + + @Override + public void processElement(StreamRecord<Integer> element) throws Exception { + if (element.getTimestamp() != element.getValue()) { + Assert.fail("Timestamps are not properly handled."); + } + oldTimestamp = element.getTimestamp(); + output.collect(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + watermarks.add(mark); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + watermarks = new ArrayList<Watermark>(); + } + + @Override + public void close() throws Exception { + super.close(); + finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks; + } + } + + public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { + + @Override + public void processElement(StreamRecord<Integer> element) throws Exception { + if (element.getTimestamp() != element.getValue()) { + Assert.fail("Timestamps are not properly handled."); + } + output.collect(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + } + } + + public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { + + @Override + public void processElement(StreamRecord<Integer> element) throws Exception { + if (element.getTimestamp() != 0) { + Assert.fail("Timestamps are not properly handled."); + } + output.collect(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + } + } + + public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> { + @Override + public Integer map1(Integer value) throws Exception { + return value; + } + + @Override + public Integer map2(Integer value) throws Exception { + return value; + } + } + + public static class IdentityMap implements MapFunction<Integer, Integer> { + @Override + public Integer map(Integer value) throws Exception { + return value; + } + } + + public static class MyTimestampSource implements EventTimeSourceFunction<Integer> { + + long initialTime; + int numWatermarks; + + public MyTimestampSource(long initialTime, int numWatermarks) { + this.initialTime = initialTime; + this.numWatermarks = numWatermarks; + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + for (int i = 0; i < numWatermarks; i++) { + ctx.collectWithTimestamp(i, initialTime + i); + ctx.emitWatermark(new Watermark(initialTime + i)); + } + } + + @Override + public void cancel() { + + } + } + + public static class MyNonWatermarkingSource implements SourceFunction<Integer> { + + int numWatermarks; + + public MyNonWatermarkingSource(int numWatermarks) { + this.numWatermarks = numWatermarks; + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + for (int i = 0; i < numWatermarks; i++) { + ctx.collect(i); + } + } + + @Override + public void cancel() { + + } + } + + // This is a event-time source. This should only emit elements with timestamps. The test should + // therefore throw an exception + public static class MyErroneousTimestampSource implements EventTimeSourceFunction<Integer> { + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + for (int i = 0; i < 10; i++) { + ctx.collect(i); + } + } + + @Override + public void cancel() { + + } + } + + // This is a normal source. This should only emit elements without timestamps. The test should + // therefore throw an exception + public static class MyErroneousSource implements SourceFunction<Integer> { + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + for (int i = 0; i < 10; i++) { + ctx.collectWithTimestamp(i, 0L); + } + } + + @Override + public void cancel() { + + } + } + + // This is a normal source. This should only emit elements without timestamps. This also + // must not emit watermarks. The test should therefore throw an exception + public static class MyErroneousWatermarkSource implements SourceFunction<Integer> { + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + for (int i = 0; i < 10; i++) { + ctx.collect(i); + ctx.emitWatermark(new Watermark(0L)); + } + } + + @Override + public void cancel() { + + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java deleted file mode 100644 index 0467b5f..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.runtime.io.CoReaderIterator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; -import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; - -public class MockCoContext<IN1, IN2, OUT> { - // private Collection<IN1> input1; - // private Collection<IN2> input2; - private Iterator<IN1> inputIterator1; - private Iterator<IN2> inputIterator2; - private List<OUT> outputs; - - private Output<OUT> collector; - private StreamRecordSerializer<IN1> inDeserializer1; - private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator; - private StreamRecordSerializer<IN2> inDeserializer2; - - public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) { - - if (input1.isEmpty() || input2.isEmpty()) { - throw new RuntimeException("Inputs must not be empty"); - } - - this.inputIterator1 = input1.iterator(); - this.inputIterator2 = input2.iterator(); - - TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next()); - inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1, new ExecutionConfig()); - TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next()); - inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2, new ExecutionConfig()); - - mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2); - - outputs = new ArrayList<OUT>(); - collector = new MockOutput<OUT>(outputs); - } - - private int currentInput = 1; - private StreamRecord<IN1> reuse1; - private StreamRecord<IN2> reuse2; - - private class MockCoReaderIterator extends - CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> { - - public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> serializer1, - TypeSerializer<StreamRecord<IN2>> serializer2) { - super(null, serializer1, serializer2); - reuse1 = inDeserializer1.createInstance(); - reuse2 = inDeserializer2.createInstance(); - } - - @Override - public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException { - this.delegate1.setInstance(target1); - this.delegate2.setInstance(target2); - - int inputNumber = nextRecord(); - target1.setObject(reuse1.getObject()); - target2.setObject(reuse2.getObject()); - - return inputNumber; - } - } - - private Integer nextRecord() { - if (inputIterator1.hasNext() && inputIterator2.hasNext()) { - switch (currentInput) { - case 1: - return next1(); - case 2: - return next2(); - default: - return 0; - } - } - - if (inputIterator1.hasNext()) { - return next1(); - } - - if (inputIterator2.hasNext()) { - return next2(); - } - - return 0; - } - - private int next1() { - reuse1 = inDeserializer1.createInstance(); - reuse1.setObject(inputIterator1.next()); - currentInput = 2; - return 1; - } - - private int next2() { - reuse2 = inDeserializer2.createInstance(); - reuse2.setObject(inputIterator2.next()); - currentInput = 1; - return 2; - } - - public List<OUT> getOutputs() { - return outputs; - } - - public Output<OUT> getCollector() { - return collector; - } - - public StreamRecordSerializer<IN1> getInDeserializer1() { - return inDeserializer1; - } - - public StreamRecordSerializer<IN2> getInDeserializer2() { - return inDeserializer2; - } - - public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() { - return mockIterator; - } - - public static <IN1, IN2, OUT> List<OUT> createAndExecute(TwoInputStreamOperator<IN1, IN2, OUT> operator, - List<IN1> input1, List<IN2> input2) { - MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2); - StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask", - new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>()); - - operator.setup(mockContext.collector, runtimeContext); - - try { - operator.open(null); - - StreamRecordSerializer<IN1> inputDeserializer1 = mockContext.getInDeserializer1(); - StreamRecordSerializer<IN2> inputDeserializer2 = mockContext.getInDeserializer2(); - CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter = mockContext.mockIterator; - - boolean isRunning = true; - - int next; - StreamRecord<IN1> reuse1 = inputDeserializer1.createInstance(); - StreamRecord<IN2> reuse2 = inputDeserializer2.createInstance(); - - while (isRunning) { - try { - next = coIter.next(reuse1, reuse2); - } catch (IOException e) { - if (isRunning) { - throw new RuntimeException("Could not read next record.", e); - } else { - // Task already cancelled do nothing - next = 0; - } - } catch (IllegalStateException e) { - if (isRunning) { - throw new RuntimeException("Could not read next record.", e); - } else { - // Task already cancelled do nothing - next = 0; - } - } - - if (next == 0) { - break; - } else if (next == 1) { - operator.processElement1(reuse1.getObject()); - reuse1 = inputDeserializer1.createInstance(); - } else { - operator.processElement2(reuse2.getObject()); - reuse2 = inputDeserializer2.createInstance(); - } - } - - operator.close(); - } catch (Exception e) { - throw new RuntimeException("Cannot invoke operator.", e); - } - - return mockContext.getOutputs(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index 0d09c14..45ae88f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -17,34 +17,25 @@ package org.apache.flink.streaming.util; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.io.IndexedReaderIterator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; public class MockContext<IN, OUT> { private Collection<IN> inputs; private List<OUT> outputs; private MockOutput<OUT> output; - private StreamRecordSerializer<IN> inDeserializer; - private IndexedReaderIterator<StreamRecord<IN>> iterator; public MockContext(Collection<IN> inputs) { this.inputs = inputs; @@ -52,58 +43,19 @@ public class MockContext<IN, OUT> { throw new RuntimeException("Inputs must not be empty"); } - TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next()); - inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo, new ExecutionConfig()); - - iterator = new IndexedInputIterator(); outputs = new ArrayList<OUT>(); output = new MockOutput<OUT>(outputs); } - private class IndexedInputIterator extends IndexedReaderIterator<StreamRecord<IN>> { - Iterator<IN> listIterator; - - public IndexedInputIterator() { - super(null, null); - listIterator = inputs.iterator(); - } - - @Override - public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException { - if (listIterator.hasNext()) { - reuse.setObject(listIterator.next()); - } else { - reuse = null; - } - return reuse; - } - - @Override - public StreamRecord<IN> next() throws IOException { - if (listIterator.hasNext()) { - StreamRecord<IN> result = inDeserializer.createInstance(); - result.setObject(listIterator.next()); - return result; - } else { - return null; - } - } - } - public List<OUT> getOutputs() { return outputs; } - public Collector<OUT> getOutput() { + public Output<StreamRecord<OUT>> getOutput() { return output; } - public MutableObjectIterator<StreamRecord<IN>> getIterator() { - return iterator; - } - - public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, - List<IN> inputs) { + public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) { MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs); StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, @@ -114,8 +66,8 @@ public class MockContext<IN, OUT> { operator.open(null); StreamRecord<IN> nextRecord; - while ((nextRecord = mockContext.getIterator().next()) != null) { - operator.processElement(nextRecord.getObject()); + for (IN in: inputs) { + operator.processElement(new StreamRecord<IN>(in)); } operator.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java index 6799d87..5371ba0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java @@ -22,9 +22,10 @@ import java.util.Collection; import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.util.Collector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -public class MockOutput<T> implements Output<T> { +public class MockOutput<T> implements Output<StreamRecord<T>> { private Collection<T> outputs; public MockOutput(Collection<T> outputs) { @@ -32,13 +33,18 @@ public class MockOutput<T> implements Output<T> { } @Override - public void collect(T record) { + public void collect(StreamRecord<T> record) { T copied = SerializationUtils.deserialize(SerializationUtils - .serialize((Serializable) record)); + .serialize((Serializable) record.getValue())); outputs.add(copied); } @Override + public void emitWatermark(Watermark mark) { + throw new RuntimeException("THIS MUST BE IMPLEMENTED"); + } + + @Override public void close() { } } http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java deleted file mode 100644 index 1731e7c..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; - -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamtask.MockRecordWriter; -import org.mockito.Mockito; - -public class MockRecordWriterFactory { - - @SuppressWarnings("unchecked") - public static MockRecordWriter create() { - MockRecordWriter recWriter = mock(MockRecordWriter.class); - - Mockito.when(recWriter.initList()).thenCallRealMethod(); - doCallRealMethod().when(recWriter).emit(Mockito.any(SerializationDelegate.class)); - - recWriter.initList(); - - return recWriter; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java new file mode 100644 index 0000000..133f143 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * A test harness for testing a {@link OneInputStreamOperator}. + * + * <p> + * This mock task provides the operator with a basic runtime context and allows pushing elements + * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements + * and watermarks can be retrieved. You are free to modify these. + */ +public class OneInputStreamOperatorTestHarness<IN, OUT> { + + OneInputStreamOperator<IN, OUT> operator; + + ConcurrentLinkedQueue outputList; + + ExecutionConfig executionConfig; + + public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) { + this.operator = operator; + + outputList = new ConcurrentLinkedQueue(); + + executionConfig = new ExecutionConfig(); + + StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext( + "MockTwoInputTask", + new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), + getClass().getClassLoader(), + executionConfig, + null, + new LocalStateHandle.LocalStateHandleProvider<Serializable>(), + new HashMap<String, Accumulator<?, ?>>()); + + operator.setup(new MockOutput(), runtimeContext); + } + + /** + * Get all the output from the task. This contains StreamRecords and Events interleaved. Use + * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)} + * to extract only the StreamRecords. + */ + public Queue getOutput() { + return outputList; + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(org.apache.flink.configuration.Configuration)} + * with an empty {@link org.apache.flink.configuration.Configuration}. + */ + public void open() throws Exception { + operator.open(new Configuration()); + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(org.apache.flink.configuration.Configuration)} + * with the given {@link org.apache.flink.configuration.Configuration}. + */ + public void open(Configuration config) throws Exception { + operator.open(config); + } + + /** + * Calls close on the operator. + */ + public void close() throws Exception { + operator.close(); + } + + public void processElement(StreamRecord<IN> element) throws Exception { + operator.processElement(element); + } + + public void processElements(Collection<StreamRecord<IN>> elements) throws Exception { + for (StreamRecord<IN> element: elements) { + operator.processElement(element); + } + } + + public void processWatermark(Watermark mark) throws Exception { + operator.processWatermark(mark); + } + + private class MockOutput implements Output<StreamRecord<OUT>> { + + private TypeSerializer<OUT> outputSerializer; + + @Override + @SuppressWarnings("unchecked") + public void emitWatermark(Watermark mark) { + outputList.add(mark); + } + + @Override + @SuppressWarnings("unchecked") + public void collect(StreamRecord<OUT> element) { + if (outputSerializer == null) { + outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); + } + outputList.add(new StreamRecord<OUT>(outputSerializer.copy(element.getValue()), + element.getTimestamp())); + } + + @Override + public void close() { + // ignore + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java index 764fe5f..2d7f6b5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java @@ -29,10 +29,13 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; -import org.apache.flink.util.Collector; public class SourceFunctionUtil<T> { @@ -40,25 +43,20 @@ public class SourceFunctionUtil<T> { List<T> outputs = new ArrayList<T>(); if (sourceFunction instanceof RichFunction) { RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig(), null, new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>()); + new ExecutionConfig(), null, new LocalStateHandle.LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>()); ((RichFunction) sourceFunction).setRuntimeContext(runtimeContext); ((RichFunction) sourceFunction).open(new Configuration()); } try { - final Collector<T> collector = new MockOutput<T>(outputs); - final Object lockObject = new Object(); - SourceFunction.SourceContext<T> ctx = new SourceFunction.SourceContext<T>() { - @Override - public void collect(T element) { - collector.collect(element); - } - - @Override - public Object getCheckpointLock() { - return lockObject; - } - }; + final Output<StreamRecord<T>> collector = new MockOutput<T>(outputs); + final Object lockingObject = new Object(); + SourceFunction.SourceContext<T> ctx; + if (sourceFunction instanceof EventTimeSourceFunction) { + ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector); + } else { + ctx = new StreamSource.NonWatermarkContext<T>(lockingObject, collector); + } sourceFunction.run(ctx); } catch (Exception e) { throw new RuntimeException("Cannot invoke source.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java new file mode 100644 index 0000000..a0a6c8d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Assert; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +/** + * Utils for working with the various test harnesses. + */ +public class TestHarnessUtil { + /** + * Extracts the StreamRecords from the given output list. + */ + @SuppressWarnings("unchecked") + public static <OUT> List<StreamRecord<OUT>> getStreamRecordsFromOutput(List output) { + List<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>(); + for (Object e: output) { + if (e instanceof StreamRecord) { + resultElements.add((StreamRecord<OUT>) e); + } + } + return resultElements; + } + + /** + * Extracts the raw elements from the given output list. + */ + @SuppressWarnings("unchecked") + public static <OUT> List<OUT> getRawElementsFromOutput(Queue output) { + List<OUT> resultElements = new LinkedList<OUT>(); + for (Object e: output) { + if (e instanceof StreamRecord) { + resultElements.add((OUT) ((StreamRecord) e).getValue()); + } + } + return resultElements; + } + + /** + * Compare the two queues containing operator/task output by converting them to an array first. + */ + public static void assertOutputEquals(String message, Queue expected, Queue actual) { + Assert.assertArrayEquals(message, + expected.toArray(), + actual.toArray()); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java new file mode 100644 index 0000000..ea753f8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalStateHandle; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * A test harness for testing a {@link TwoInputStreamOperator}. + * + * <p> + * This mock task provides the operator with a basic runtime context and allows pushing elements + * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements + * and watermarks can be retrieved. you are free to modify these. + */ +public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> { + + TwoInputStreamOperator<IN1, IN2, OUT> operator; + + ConcurrentLinkedQueue outputList; + + ExecutionConfig executionConfig; + + public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) { + this.operator = operator; + + outputList = new ConcurrentLinkedQueue(); + + executionConfig = new ExecutionConfig(); + + StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext( + "MockTwoInputTask", + new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), + getClass().getClassLoader(), + new ExecutionConfig(), + null, + new LocalStateHandle.LocalStateHandleProvider<Serializable>(), + new HashMap<String, Accumulator<?, ?>>()); + + operator.setup(new MockOutput(), runtimeContext); + } + + /** + * Get all the output from the task. This contains StreamRecords and Events interleaved. Use + * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)} + * to extract only the StreamRecords. + */ + public Queue getOutput() { + return outputList; + } + + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(Configuration)} + * with an empty {@link Configuration}. + */ + public void open() throws Exception { + operator.open(new Configuration()); + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(Configuration)} + * with the given {@link Configuration}. + */ + public void open(Configuration config) throws Exception { + operator.open(config); + } + + /** + * Calls close on the operator. + */ + public void close() throws Exception { + operator.close(); + } + + public void processElement1(StreamRecord<IN1> element) throws Exception { + operator.processElement1(element); + } + + public void processElement2(StreamRecord<IN2> element) throws Exception { + operator.processElement2(element); + } + + public void processWatermark1(Watermark mark) throws Exception { + operator.processWatermark1(mark); + } + + public void processWatermark2(Watermark mark) throws Exception { + operator.processWatermark2(mark); + } + + private class MockOutput implements Output<StreamRecord<OUT>> { + + private TypeSerializer<OUT> outputSerializer; + + @Override + @SuppressWarnings("unchecked") + public void emitWatermark(Watermark mark) { + outputList.add(mark); + } + + @Override + @SuppressWarnings("unchecked") + public void collect(StreamRecord<OUT> element) { + if (outputSerializer == null) { + outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); + } + outputList.add(new StreamRecord<OUT>(outputSerializer.copy(element.getValue()), + element.getTimestamp())); + } + + @Override + public void close() { + // ignore + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java new file mode 100644 index 0000000..6197092 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * A simple test that runs a streaming topology with checkpointing enabled. This differs from + * {@link org.apache.flink.test.checkpointing.StreamCheckpointingITCase} in that it contains + * a TwoInput (or co-) Task. + * + * <p> + * This checks whether checkpoint barriers correctly trigger TwoInputTasks and also whether + * this barriers are correctly forwarded. + * + * <p> + * This uses a mixture of Operators with the {@link Checkpointed} interface and the new + * {@link org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext#getOperatorState} + * method. + * + * <p> + * The test triggers a failure after a while and verifies that, after completion, the + * state reflects the "exactly once" semantics. + */ +@SuppressWarnings("serial") +public class CoStreamCheckpointingITCase { + + private static final int NUM_TASK_MANAGERS = 2; + private static final int NUM_TASK_SLOTS = 3; + private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + + private static ForkableFlinkMiniCluster cluster; + + @BeforeClass + public static void startCluster() { + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS); + config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms"); + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); + + cluster = new ForkableFlinkMiniCluster(config, false); + } + catch (Exception e) { + e.printStackTrace(); + fail("Failed to start test cluster: " + e.getMessage()); + } + } + + @AfterClass + public static void shutdownCluster() { + try { + cluster.shutdown(); + cluster = null; + } + catch (Exception e) { + e.printStackTrace(); + fail("Failed to stop test cluster: " + e.getMessage()); + } + } + + + + /** + * Runs the following program: + * + * <pre> + * [ (source)->(filter)->(map) ] -> [ (co-map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ] + * </pre> + */ + @Test + public void runCheckpointedProgram() { + + final long NUM_STRINGS = 10000000L; + assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(500); + env.getConfig().disableSysoutLogging(); + + DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)); + + stream + // -------------- first vertex, chained to the source ---------------- + .filter(new StringRichFilterFunction()) + + // -------------- second vertex - the stateful one that also fails ---------------- + .connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction()) + + // -------------- third vertex - the stateful one that also fails ---------------- + .map(new StringPrefixCountRichMapFunction()) + .startNewChain() + .map(new StatefulCounterFunction()) + + // -------------- fourth vertex - reducer and the sink ---------------- + .groupBy("prefix") + .reduce(new OnceFailingReducer(NUM_STRINGS)) + .addSink(new RichSinkFunction<PrefixCount>() { + + private Map<Character, Long> counts = new HashMap<Character, Long>(); + + @Override + public void invoke(PrefixCount value) { + Character first = value.prefix.charAt(0); + Long previous = counts.get(first); + if (previous == null) { + counts.put(first, value.count); + } else { + counts.put(first, Math.max(previous, value.count)); + } + } + +// @Override +// public void close() { +// for (Long count : counts.values()) { +// assertEquals(NUM_STRINGS / 40, count.longValue()); +// } +// } + }); + + env.execute(); + + long filterSum = 0; + for (long l : StringRichFilterFunction.counts) { + filterSum += l; + } + + long coMapSum = 0; + for (long l : LeftIdentityCoRichFlatMapFunction.counts) { + coMapSum += l; + } + + long mapSum = 0; + for (long l : StringPrefixCountRichMapFunction.counts) { + mapSum += l; + } + + long countSum = 0; + for (long l : StatefulCounterFunction.counts) { + countSum += l; + } + + if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) { + Assert.fail("Restore was never called on counting Map function."); + } + + if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) { + Assert.fail("Restore was never called on counting CoMap function."); + } + + // verify that we counted exactly right + + assertEquals(NUM_STRINGS, filterSum); + assertEquals(NUM_STRINGS, coMapSum); + assertEquals(NUM_STRINGS, mapSum); + assertEquals(NUM_STRINGS, countSum); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + // Custom Functions + // -------------------------------------------------------------------------------------------- + + private static class StringGeneratingSourceFunction extends RichSourceFunction<String> + implements ParallelSourceFunction<String> { + + private final long numElements; + + private Random rnd; + private StringBuilder stringBuilder; + + private OperatorState<Integer> index; + private int step; + + private volatile boolean isRunning; + + static final long[] counts = new long[PARALLELISM]; + @Override + public void close() throws IOException { + counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value(); + } + + + StringGeneratingSourceFunction(long numElements) { + this.numElements = numElements; + } + + @Override + public void open(Configuration parameters) throws IOException { + rnd = new Random(); + stringBuilder = new StringBuilder(); + step = getRuntimeContext().getNumberOfParallelSubtasks(); + + + index = getRuntimeContext().getOperatorState("index", getRuntimeContext().getIndexOfThisSubtask(), false); + + isRunning = true; + } + + @Override + public void run(SourceContext<String> ctx) throws Exception { + final Object lockingObject = ctx.getCheckpointLock(); + + while (isRunning && index.value() < numElements) { + char first = (char) ((index.value() % 40) + 40); + + stringBuilder.setLength(0); + stringBuilder.append(first); + + String result = randomString(stringBuilder, rnd); + + synchronized (lockingObject) { + index.update(index.value() + step); +// System.out.println("SOURCE EMIT: " + result); + ctx.collect(result); + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + private static String randomString(StringBuilder bld, Random rnd) { + final int len = rnd.nextInt(10) + 5; + + for (int i = 0; i < len; i++) { + char next = (char) (rnd.nextInt(20000) + 33); + bld.append(next); + } + + return bld.toString(); + } + } + + private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> { + + private OperatorState<Long> count; + static final long[] counts = new long[PARALLELISM]; + + @Override + public PrefixCount map(PrefixCount value) throws Exception { + count.update(count.value() + 1); + return value; + } + + @Override + public void open(Configuration conf) throws IOException { + count = getRuntimeContext().getOperatorState("count", 0L, false); + } + + @Override + public void close() throws IOException { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value(); + } + + } + + private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> { + + private static volatile boolean hasFailed = false; + + private final long numElements; + + private long failurePos; + private long count; + + OnceFailingReducer(long numElements) { + this.numElements = numElements; + } + + @Override + public void open(Configuration parameters) { + long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); + long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); + + failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; + count = 0; + } + + @Override + public PrefixCount reduce(PrefixCount value1, PrefixCount value2) throws Exception { + count++; + if (!hasFailed && count >= failurePos) { + hasFailed = true; + throw new Exception("Test Failure"); + } + + value1.count += value2.count; + return value1; + } + } + + // -------------------------------------------------------------------------------------------- + // Custom Type Classes + // -------------------------------------------------------------------------------------------- + + public static class PrefixCount { + + public String prefix; + public String value; + public long count; + + public PrefixCount() {} + + public PrefixCount(String prefix, String value, long count) { + this.prefix = prefix; + this.value = value; + this.count = count; + } + + @Override + public String toString() { + return prefix + " / " + value; + } + } + + private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> { + + Long count = 0L; + static final long[] counts = new long[PARALLELISM]; + + @Override + public boolean filter(String value) { + count++; + return value.length() < 100; + } + + @Override + public void close() { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return count; + } + + @Override + public void restoreState(Long state) { + count = state; + } + } + + private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> { + + private long count = 0; + static final long[] counts = new long[PARALLELISM]; + static volatile boolean restoreCalledAtLeastOnce = false; + + @Override + public PrefixCount map(String value) throws IOException { + count += 1; + return new PrefixCount(value.substring(0, 1), value, 1L); + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return count; + } + + @Override + public void restoreState(Long state) { + restoreCalledAtLeastOnce = true; + count = state; + if (count == 0) { + throw new RuntimeException("Restore from beginning"); + } + } + + @Override + public void close() throws IOException { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; + } + } + + private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> { + + long count = 0; + static final long[] counts = new long[PARALLELISM]; + + static volatile boolean restoreCalledAtLeastOnce = false; + + @Override + public void flatMap1(String value, Collector<String> out) throws IOException { + count += 1; +// System.out.println("Co-Map COUNT: " + count); + + out.collect(value); + } + + @Override + public void flatMap2(String value, Collector<String> out) throws IOException { + // we ignore the values from the second input + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return count; + } + + @Override + public void restoreState(Long state) { + restoreCalledAtLeastOnce = true; + count = state; + } + + @Override + public void close() throws IOException { + counts[getRuntimeContext().getIndexOfThisSubtask()] = count; + } + } +}