http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java deleted file mode 100644 index 6c1d0e6..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ /dev/null @@ -1,875 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.timestamp; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.StoppableFunction; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.testutils.MultiShotLatch; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.co.CoMapFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.NoOpSink; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; - -import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for timestamps, watermarks, and event-time sources. - */ -@SuppressWarnings("serial") -public class TimestampITCase extends TestLogger { - - private static final int NUM_TASK_MANAGERS = 2; - private static final int NUM_TASK_SLOTS = 3; - private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; - - // this is used in some tests to synchronize - static MultiShotLatch latch; - - - private static ForkableFlinkMiniCluster cluster; - - @Before - public void setupLatch() { - // ensure that we get a fresh latch for each test - latch = new MultiShotLatch(); - } - - - @BeforeClass - public static void startCluster() { - try { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); - - cluster = new ForkableFlinkMiniCluster(config, false); - - cluster.start(); - } - catch (Exception e) { - e.printStackTrace(); - fail("Failed to start test cluster: " + e.getMessage()); - } - } - - @AfterClass - public static void shutdownCluster() { - try { - cluster.shutdown(); - cluster = null; - } - catch (Exception e) { - e.printStackTrace(); - fail("Failed to stop test cluster: " + e.getMessage()); - } - } - - /** - * These check whether custom timestamp emission works at sources and also whether timestamps - * arrive at operators throughout a topology. - * - * <p> - * This also checks whether watermarks keep propagating if a source closes early. - * - * <p> - * This only uses map to test the workings of watermarks in a complete, running topology. All - * tasks and stream operators have dedicated tests that test the watermark propagation - * behaviour. - */ - @Test - public void testWatermarkPropagation() throws Exception { - final int NUM_WATERMARKS = 10; - - long initialTime = 0L; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(PARALLELISM); - env.getConfig().disableSysoutLogging(); - - DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS)); - DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS / 2)); - - source1.union(source2) - .map(new IdentityMap()) - .connect(source2).map(new IdentityCoMap()) - .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) - .addSink(new NoOpSink<Integer>()); - - env.execute(); - - // verify that all the watermarks arrived at the final custom operator - for (int i = 0; i < PARALLELISM; i++) { - // we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the - // other source stops emitting after that - for (int j = 0; j < NUM_WATERMARKS / 2; j++) { - if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) { - System.err.println("All Watermarks: "); - for (int k = 0; k <= NUM_WATERMARKS / 2; k++) { - System.err.println(CustomOperator.finalWatermarks[i].get(k)); - } - - fail("Wrong watermark."); - } - } - - assertEquals(Watermark.MAX_WATERMARK, - CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1)); - } - } - - @Test - public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception { - - // for this test to work, we need to be sure that no other jobs are being executed - while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) { - Thread.sleep(100); - } - - final int NUM_WATERMARKS = 10; - - long initialTime = 0L; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(PARALLELISM); - env.getConfig().disableSysoutLogging(); - - DataStream<Integer> source1 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS)); - DataStream<Integer> source2 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS / 2)); - - source1.union(source2) - .map(new IdentityMap()) - .connect(source2).map(new IdentityCoMap()) - .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) - .addSink(new NoOpSink<Integer>()); - - new Thread("stopper") { - @Override - public void run() { - try { - // try until we get the running jobs - List<JobID> running; - while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) { - Thread.sleep(50); - } - - JobID id = running.get(0); - - // send stop until the job is stopped - do { - cluster.stopJob(id); - Thread.sleep(50); - } while (!cluster.getCurrentlyRunningJobsJava().isEmpty()); - } - catch (Throwable t) { - t.printStackTrace(); - } - } - }.start(); - - env.execute(); - - // verify that all the watermarks arrived at the final custom operator - for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) { - - // we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the - // other source stops emitting after that - for (int j = 0; j < subtaskWatermarks.size(); j++) { - if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) { - System.err.println("All Watermarks: "); - for (int k = 0; k <= NUM_WATERMARKS / 2; k++) { - System.err.println(subtaskWatermarks.get(k)); - } - - fail("Wrong watermark."); - } - } - - // if there are watermarks, the final one must not be the MAX watermark - if (subtaskWatermarks.size() > 0) { - assertNotEquals(Watermark.MAX_WATERMARK, - subtaskWatermarks.get(subtaskWatermarks.size()-1)); - } - } - } - - /** - * These check whether timestamps are properly assigned at the sources and handled in - * network transmission and between chained operators when timestamps are enabled. - */ - @Test - public void testTimestampHandling() throws Exception { - final int NUM_ELEMENTS = 10; - - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(PARALLELISM); - env.getConfig().disableSysoutLogging(); - - DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS)); - DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS)); - - source1 - .map(new IdentityMap()) - .connect(source2).map(new IdentityCoMap()) - .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()) - .addSink(new NoOpSink<Integer>()); - - - env.execute(); - } - - /** - * These check whether timestamps are properly ignored when they are disabled. - */ - @Test - public void testDisabledTimestamps() throws Exception { - final int NUM_ELEMENTS = 10; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - env.setParallelism(PARALLELISM); - env.getConfig().disableSysoutLogging(); - - DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS)); - DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS)); - - source1 - .map(new IdentityMap()) - .connect(source2).map(new IdentityCoMap()) - .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator()) - .addSink(new NoOpSink<Integer>()); - - env.execute(); - } - - /** - * This tests whether timestamps are properly extracted in the timestamp - * extractor and whether watermarks are also correctly forwared from this with the auto watermark - * interval. - */ - @Test - public void testTimestampExtractorWithAutoInterval() throws Exception { - final int NUM_ELEMENTS = 10; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.getConfig().setAutoWatermarkInterval(10); - env.setParallelism(1); - env.getConfig().disableSysoutLogging(); - - - DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - int index = 1; - while (index <= NUM_ELEMENTS) { - ctx.collect(index); - latch.await(); - index++; - } - } - - @Override - public void cancel() {} - }); - - DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks( - new AscendingTimestampExtractor<Integer>() { - @Override - public long extractAscendingTimestamp(Integer element) { - return element; - } - }); - - extractOp - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) - .transform("Timestamp Check", - BasicTypeInfo.INT_TYPE_INFO, - new TimestampCheckingOperator()); - - // verify that extractor picks up source parallelism - Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism()); - - env.execute(); - - // verify that we get NUM_ELEMENTS watermarks - for (int j = 0; j < NUM_ELEMENTS; j++) { - if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { - long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp(); - Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]); - } - } - - // the input is finite, so it should have a MAX Watermark - assertEquals(Watermark.MAX_WATERMARK, - CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); - } - - /** - * This thests whether timestamps are properly extracted in the timestamp - * extractor and whether watermark are correctly forwarded from the custom watermark emit - * function. - */ - @Test - public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception { - final int NUM_ELEMENTS = 10; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.getConfig().setAutoWatermarkInterval(10); - env.setParallelism(1); - env.getConfig().disableSysoutLogging(); - - - DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - int index = 1; - while (index <= NUM_ELEMENTS) { - ctx.collect(index); - latch.await(); - index++; - } - } - - @Override - public void cancel() {} - }); - - source1 - .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() { - - @Override - public long extractTimestamp(Integer element, long currentTimestamp) { - return element; - } - - @Override - public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) { - return new Watermark(extractedTimestamp - 1); - } - }) - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) - .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); - - - env.execute(); - - // verify that we get NUM_ELEMENTS watermarks - for (int j = 0; j < NUM_ELEMENTS; j++) { - if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { - Assert.fail("Wrong watermark."); - } - } - - // the input is finite, so it should have a MAX Watermark - assertEquals(Watermark.MAX_WATERMARK, - CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); - } - - /** - * This test verifies that the timestamp extractor does not emit decreasing watermarks even - * - */ - @Test - public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception { - final int NUM_ELEMENTS = 10; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.getConfig().setAutoWatermarkInterval(1); - env.setParallelism(1); - env.getConfig().disableSysoutLogging(); - - - DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - int index = 1; - while (index <= NUM_ELEMENTS) { - ctx.collect(index); - Thread.sleep(100); - ctx.collect(index - 1); - latch.await(); - index++; - } - } - - @Override - public void cancel() {} - }); - - source1 - .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() { - - @Override - public long extractTimestamp(Integer element, long previousTimestamp) { - return element; - } - - @Override - public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) { - return new Watermark(extractedTimestamp - 1); - } - }) - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) - .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); - - - env.execute(); - - // verify that we get NUM_ELEMENTS watermarks - for (int j = 0; j < NUM_ELEMENTS; j++) { - if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) { - Assert.fail("Wrong watermark."); - } - } - // the input is finite, so it should have a MAX Watermark - assertEquals(Watermark.MAX_WATERMARK, - CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1)); - } - - /** - * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks. - */ - @Test - public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception { - final int NUM_ELEMENTS = 10; - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.getConfig().setAutoWatermarkInterval(1); - env.setParallelism(2); - env.getConfig().disableSysoutLogging(); - - - DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - int index = 1; - while (index <= NUM_ELEMENTS) { - ctx.collectWithTimestamp(index, index); - ctx.collectWithTimestamp(index - 1, index - 1); - index++; - ctx.emitWatermark(new Watermark(index-2)); - } - - // emit the final Long.MAX_VALUE watermark, do it twice and verify that - // we only see one in the result - ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - } - - @Override - public void cancel() {} - }); - - source1 - .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() { - - @Override - public long extractTimestamp(Integer element, long currentTimestamp) { - return element; - } - - @Override - public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) { - return null; - } - }) - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); - - - env.execute(); - - Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); - Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE); - } - - /** - * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks. - * - * Same test as before, but using a different timestamp extractor - */ - @Test - public void testTimestampExtractorWithLongMaxWatermarkFromSource2() throws Exception { - final int NUM_ELEMENTS = 10; - - StreamExecutionEnvironment env = StreamExecutionEnvironment - .createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); - - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.getConfig().setAutoWatermarkInterval(10); - env.setParallelism(2); - env.getConfig().disableSysoutLogging(); - - DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - int index = 1; - while (index <= NUM_ELEMENTS) { - ctx.collectWithTimestamp(index, index); - ctx.collectWithTimestamp(index - 1, index - 1); - index++; - ctx.emitWatermark(new Watermark(index-2)); - } - - // emit the final Long.MAX_VALUE watermark, do it twice and verify that - // we only see one in the result - ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); - } - - @Override - public void cancel() {} - }); - - source1 - .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Integer>() { - - @Override - public long extractTimestamp(Integer element, long currentTimestamp) { - return element; - } - - @Override - public Watermark getCurrentWatermark() { - return null; - } - }) - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); - - env.execute(); - - Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); - Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE); - } - - /** - * This verifies that an event time source works when setting stream time characteristic to - * processing time. In this case, the watermarks should just be swallowed. - */ - @Test - public void testEventTimeSourceWithProcessingTime() throws Exception { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); - - env.setParallelism(2); - env.getConfig().disableSysoutLogging(); - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 10)); - - source1 - .map(new IdentityMap()) - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false)); - - env.execute(); - - // verify that we don't get any watermarks, the source is used as watermark source in - // other tests, so it normally emits watermarks - Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 0); - } - - @Test - public void testErrorOnEventTimeOverProcessingTime() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setParallelism(2); - env.getConfig().disableSysoutLogging(); - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - DataStream<Tuple2<String, Integer>> source1 = - env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2)); - - source1 - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.seconds(5))) - .reduce(new ReduceFunction<Tuple2<String, Integer>>() { - @Override - public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) { - return value1; - } - }) - .print(); - - try { - env.execute(); - fail("this should fail with an exception"); - } catch (Exception e) { - // expected - } - } - - @Test - public void testErrorOnEventTimeWithoutTimestamps() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setParallelism(2); - env.getConfig().disableSysoutLogging(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - DataStream<Tuple2<String, Integer>> source1 = - env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2)); - - source1 - .keyBy(0) - .window(TumblingEventTimeWindows.of(Time.seconds(5))) - .reduce(new ReduceFunction<Tuple2<String, Integer>>() { - @Override - public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) { - return value1; - } - }) - .print(); - - try { - env.execute(); - fail("this should fail with an exception"); - } catch (Exception e) { - // expected - } - } - - // ------------------------------------------------------------------------ - // Custom Operators and Functions - // ------------------------------------------------------------------------ - - @SuppressWarnings("unchecked") - public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { - - List<Watermark> watermarks; - public static List<Watermark>[] finalWatermarks = new List[PARALLELISM]; - private final boolean timestampsEnabled; - - public CustomOperator(boolean timestampsEnabled) { - setChainingStrategy(ChainingStrategy.ALWAYS); - this.timestampsEnabled = timestampsEnabled; - } - - @Override - public void processElement(StreamRecord<Integer> element) throws Exception { - if (timestampsEnabled) { - if (element.getTimestamp() != element.getValue()) { - Assert.fail("Timestamps are not properly handled."); - } - } - output.collect(element); - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - for (Watermark previousMark: watermarks) { - assertTrue(previousMark.getTimestamp() < mark.getTimestamp()); - } - watermarks.add(mark); - latch.trigger(); - output.emitWatermark(mark); - } - - @Override - public void open() throws Exception { - super.open(); - watermarks = new ArrayList<>(); - } - - @Override - public void close() throws Exception { - super.close(); - finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks; - } - } - - public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { - - public TimestampCheckingOperator() { - setChainingStrategy(ChainingStrategy.ALWAYS); - } - - @Override - public void processElement(StreamRecord<Integer> element) throws Exception { - if (element.getTimestamp() != element.getValue()) { - Assert.fail("Timestamps are not properly handled."); - } - output.collect(element); - } - - @Override - public void processWatermark(Watermark mark) throws Exception {} - } - - public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { - - @Override - public void processElement(StreamRecord<Integer> element) throws Exception { - if (element.hasTimestamp()) { - Assert.fail("Timestamps are not properly handled."); - } - output.collect(element); - } - - @Override - public void processWatermark(Watermark mark) throws Exception {} - } - - public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> { - @Override - public Integer map1(Integer value) throws Exception { - return value; - } - - @Override - public Integer map2(Integer value) throws Exception { - return value; - } - } - - public static class IdentityMap implements MapFunction<Integer, Integer> { - @Override - public Integer map(Integer value) throws Exception { - return value; - } - } - - public static class MyTimestampSource implements SourceFunction<Integer> { - - private final long initialTime; - private final int numWatermarks; - - public MyTimestampSource(long initialTime, int numWatermarks) { - this.initialTime = initialTime; - this.numWatermarks = numWatermarks; - } - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - for (int i = 0; i < numWatermarks; i++) { - ctx.collectWithTimestamp(i, initialTime + i); - ctx.emitWatermark(new Watermark(initialTime + i)); - } - } - - @Override - public void cancel() {} - } - - public static class MyTimestampSourceInfinite implements SourceFunction<Integer>, StoppableFunction { - - private final long initialTime; - private final int numWatermarks; - - private volatile boolean running = true; - - public MyTimestampSourceInfinite(long initialTime, int numWatermarks) { - this.initialTime = initialTime; - this.numWatermarks = numWatermarks; - } - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - for (int i = 0; i < numWatermarks; i++) { - ctx.collectWithTimestamp(i, initialTime + i); - ctx.emitWatermark(new Watermark(initialTime + i)); - } - - while (running) { - Thread.sleep(20); - } - } - - @Override - public void cancel() { - running = false; - } - - @Override - public void stop() { - running = false; - } - } - - public static class MyNonWatermarkingSource implements SourceFunction<Integer> { - - int numWatermarks; - - public MyNonWatermarkingSource(int numWatermarks) { - this.numWatermarks = numWatermarks; - } - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - for (int i = 0; i < numWatermarks; i++) { - ctx.collect(i); - } - } - - @Override - public void cancel() {} - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java deleted file mode 100644 index d398121..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util; - -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - -public final class NoOpSink<T> extends RichSinkFunction<T> { - public void invoke(T tuple) { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java deleted file mode 100644 index a46ff55..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.util; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertTrue; - -public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> { - private List<T> received; - - public void invoke(T tuple) { - received.add(tuple); - } - - public void open(Configuration conf) { - received = new ArrayList<T>(); - } - - public void close() { - assertTrue(received.size() > 0); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java deleted file mode 100644 index 7d6a6d0..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.util.NetUtils; - -import org.junit.Assert; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -/** - * Test base for streaming programs relying on an open server socket to write to. - */ -public abstract class SocketOutputTestBase extends StreamingProgramTestBase { - - protected static final String HOST = "localhost"; - protected static Integer port; - protected Set<String> dataReadFromSocket = new HashSet<String>(); - - @Override - protected void preSubmit() throws Exception { - port = NetUtils.getAvailablePort(); - temporarySocket = createLocalSocket(port); - } - - @Override - protected void postSubmit() throws Exception { - Set<String> expectedData = new HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n"))); - Assert.assertEquals(expectedData, dataReadFromSocket); - temporarySocket.close(); - } - - protected ServerSocket temporarySocket; - - public ServerSocket createLocalSocket(int port) throws Exception { - ServerSocket serverSocket = new ServerSocket(port); - ServerThread st = new ServerThread(serverSocket); - st.start(); - return serverSocket; - } - - protected class ServerThread extends Thread { - - private ServerSocket serverSocket; - private Thread t; - - public ServerThread(ServerSocket serverSocket) { - this.serverSocket = serverSocket; - t = new Thread(this); - } - - public void waitForAccept() throws Exception { - Socket socket = serverSocket.accept(); - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - DeserializationSchema<String> schema = new SimpleStringSchema(); - String rawData = in.readLine(); - while (rawData != null){ - String string = schema.deserialize(rawData.getBytes()); - dataReadFromSocket.add(string); - rawData = in.readLine(); - } - socket.close(); - } - - public void run() { - try { - waitForAccept(); - } catch (Exception e) { - Assert.fail(); - throw new RuntimeException(e); - } - } - - @Override - public void start() { - t.start(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java deleted file mode 100644 index 8cdedd5..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; -import org.apache.flink.test.util.TestBaseUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; - -/** - * Base class for streaming unit tests that run multiple tests and want to reuse the same - * Flink cluster. This saves a significant amount of time, since the startup and - * shutdown of the Flink clusters (including actor systems, etc) usually dominates - * the execution of the actual tests. - * - * To write a unit test against this test base, simply extend it and add - * one or more regular test methods and retrieve the StreamExecutionEnvironment from - * the context: - * - * <pre> - * {@literal @}Test - * public void someTest() { - * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - * // test code - * env.execute(); - * } - * - * {@literal @}Test - * public void anotherTest() { - * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - * // test code - * env.execute(); - * } - * - * </pre> - */ -public class StreamingMultipleProgramsTestBase extends AbstractTestBase { - - // ------------------------------------------------------------------------ - // The mini cluster that is shared across tests - // ------------------------------------------------------------------------ - - protected static final int DEFAULT_PARALLELISM = 4; - - protected static ForkableFlinkMiniCluster cluster; - - public StreamingMultipleProgramsTestBase() { - super(new Configuration()); - } - - // ------------------------------------------------------------------------ - // Cluster setup & teardown - // ------------------------------------------------------------------------ - - @BeforeClass - public static void setup() throws Exception { - cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true); - TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); - } - - @AfterClass - public static void teardown() throws Exception { - TestStreamEnvironment.unsetAsContext(); - stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java deleted file mode 100644 index 50ed1cf..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.AbstractTestBase; - -import org.junit.Test; - -import static org.junit.Assert.fail; - -public abstract class StreamingProgramTestBase extends AbstractTestBase { - - protected static final int DEFAULT_PARALLELISM = 4; - - private int parallelism; - - - public StreamingProgramTestBase() { - super(new Configuration()); - setParallelism(DEFAULT_PARALLELISM); - } - - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - setTaskManagerNumSlots(parallelism); - } - - public int getParallelism() { - return parallelism; - } - - - // -------------------------------------------------------------------------------------------- - // Methods to create the test program and for pre- and post- test work - // -------------------------------------------------------------------------------------------- - - protected abstract void testProgram() throws Exception; - - protected void preSubmit() throws Exception {} - - protected void postSubmit() throws Exception {} - - // -------------------------------------------------------------------------------------------- - // Test entry point - // -------------------------------------------------------------------------------------------- - - @Test - public void testJob() throws Exception { - try { - // pre-submit - try { - preSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Pre-submit work caused an error: " + e.getMessage()); - } - - // prepare the test environment - startCluster(); - - TestStreamEnvironment.setAsContext(this.executor, getParallelism()); - - // call the test program - try { - testProgram(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Error while calling the test program: " + e.getMessage()); - } - finally { - TestStreamEnvironment.unsetAsContext(); - } - - // post-submit - try { - postSubmit(); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Post-submit work caused an error: " + e.getMessage()); - } - } - finally { - stopCluster(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java deleted file mode 100644 index 423d08e..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.util.ArrayList; -import java.util.List; -import java.util.TreeSet; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - -public class TestListResultSink<T> extends RichSinkFunction<T> { - - private static final long serialVersionUID = 1L; - private int resultListId; - - public TestListResultSink() { - this.resultListId = TestListWrapper.getInstance().createList(); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - } - - @Override - public void invoke(T value) throws Exception { - synchronized (resultList()) { - resultList().add(value); - } - } - - @Override - public void close() throws Exception { - super.close(); - } - - @SuppressWarnings("unchecked") - private List<T> resultList() { - synchronized (TestListWrapper.getInstance()) { - return (List<T>) TestListWrapper.getInstance().getList(resultListId); - } - } - - public List<T> getResult() { - synchronized (resultList()) { - ArrayList<T> copiedList = new ArrayList<T>(resultList()); - return copiedList; - } - } - - public List<T> getSortedResult() { - synchronized (resultList()) { - TreeSet<T> treeSet = new TreeSet<T>(resultList()); - ArrayList<T> sortedList = new ArrayList<T>(treeSet); - return sortedList; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java deleted file mode 100644 index 751f836..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -public class TestListWrapper { - - private static TestListWrapper instance; - - @SuppressWarnings("rawtypes") - private List<List<? extends Comparable>> lists; - - @SuppressWarnings("rawtypes") - private TestListWrapper() { - lists = Collections.synchronizedList(new ArrayList<List<? extends Comparable>>()); - } - - public static TestListWrapper getInstance() { - if (instance == null) { - instance = new TestListWrapper(); - } - return instance; - } - - /** - * Creates and stores a list, returns with the id. - * - * @return The ID of the list. - */ - @SuppressWarnings("rawtypes") - public int createList() { - lists.add(new ArrayList<Comparable>()); - return lists.size() - 1; - } - - public List<?> getList(int listId) { - @SuppressWarnings("rawtypes") - List<? extends Comparable> list = lists.get(listId); - if (list == null) { - throw new RuntimeException("No such list."); - } - - return list; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java deleted file mode 100644 index c700102..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; -import org.apache.flink.util.Preconditions; - -/** - * A StreamExecutionEnvironment that executes its jobs on a test cluster. - */ -public class TestStreamEnvironment extends StreamExecutionEnvironment { - - /** The mini cluster in which this environment executes its jobs */ - private ForkableFlinkMiniCluster executor; - - - public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) { - this.executor = Preconditions.checkNotNull(executor); - setParallelism(parallelism); - } - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - final StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - final JobGraph jobGraph = streamGraph.getJobGraph(); - return executor.submitJobAndWait(jobGraph, false); - } - - // ------------------------------------------------------------------------ - - /** - * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on - * the given cluster with the given default parallelism. - * - * @param cluster The test cluster to run the test program on. - * @param parallelism The default parallelism for the test programs. - */ - public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) { - - StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() { - @Override - public StreamExecutionEnvironment createExecutionEnvironment() { - return new TestStreamEnvironment(cluster, parallelism); - } - }; - - initializeContextEnvironment(factory); - } - - /** - * Resets the streaming context environment to null. - */ - public static void unsetAsContext() { - resetContextEnvironment(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml index ffbcb87..b82faf1 100644 --- a/flink-streaming-scala/pom.xml +++ b/flink-streaming-scala/pom.xml @@ -98,15 +98,6 @@ under the License. <type>test-jar</type> </dependency> - <!-- To access streaming test utils --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java deleted file mode 100644 index 7b3ed67..0000000 --- a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.scala.api; - -import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms; -import org.apache.flink.streaming.util.SocketOutputTestBase; -import org.apache.flink.test.testdata.WordCountData; -import org.junit.Ignore; - -@Ignore -//This test sometimes fails most likely due to the behaviour -//of the socket. Disabled for now. -public class SocketOutputFormatITCase extends SocketOutputTestBase { - - @Override - protected void testProgram() throws Exception { - OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/pom.xml ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml index 3c35cf1..238c2da 100644 --- a/flink-test-utils-parent/flink-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-test-utils/pom.xml @@ -58,6 +58,13 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java new file mode 100644 index 0000000..c5fbaf0 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.TestBaseUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * Base class for streaming unit tests that run multiple tests and want to reuse the same + * Flink cluster. This saves a significant amount of time, since the startup and + * shutdown of the Flink clusters (including actor systems, etc) usually dominates + * the execution of the actual tests. + * + * To write a unit test against this test base, simply extend it and add + * one or more regular test methods and retrieve the StreamExecutionEnvironment from + * the context: + * + * <pre> + * {@literal @}Test + * public void someTest() { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * {@literal @}Test + * public void anotherTest() { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * // test code + * env.execute(); + * } + * + * </pre> + */ +public class StreamingMultipleProgramsTestBase extends AbstractTestBase { + + // ------------------------------------------------------------------------ + // The mini cluster that is shared across tests + // ------------------------------------------------------------------------ + + protected static final int DEFAULT_PARALLELISM = 4; + + protected static ForkableFlinkMiniCluster cluster; + + public StreamingMultipleProgramsTestBase() { + super(new Configuration()); + } + + // ------------------------------------------------------------------------ + // Cluster setup & teardown + // ------------------------------------------------------------------------ + + @BeforeClass + public static void setup() throws Exception { + cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, false, false, true); + TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM); + } + + @AfterClass + public static void teardown() throws Exception { + TestStreamEnvironment.unsetAsContext(); + stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java new file mode 100644 index 0000000..50ed1cf --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; + +import static org.junit.Assert.fail; + +public abstract class StreamingProgramTestBase extends AbstractTestBase { + + protected static final int DEFAULT_PARALLELISM = 4; + + private int parallelism; + + + public StreamingProgramTestBase() { + super(new Configuration()); + setParallelism(DEFAULT_PARALLELISM); + } + + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + setTaskManagerNumSlots(parallelism); + } + + public int getParallelism() { + return parallelism; + } + + + // -------------------------------------------------------------------------------------------- + // Methods to create the test program and for pre- and post- test work + // -------------------------------------------------------------------------------------------- + + protected abstract void testProgram() throws Exception; + + protected void preSubmit() throws Exception {} + + protected void postSubmit() throws Exception {} + + // -------------------------------------------------------------------------------------------- + // Test entry point + // -------------------------------------------------------------------------------------------- + + @Test + public void testJob() throws Exception { + try { + // pre-submit + try { + preSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Pre-submit work caused an error: " + e.getMessage()); + } + + // prepare the test environment + startCluster(); + + TestStreamEnvironment.setAsContext(this.executor, getParallelism()); + + // call the test program + try { + testProgram(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Error while calling the test program: " + e.getMessage()); + } + finally { + TestStreamEnvironment.unsetAsContext(); + } + + // post-submit + try { + postSubmit(); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Post-submit work caused an error: " + e.getMessage()); + } + } + finally { + stopCluster(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java new file mode 100644 index 0000000..c700102 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.Preconditions; + +/** + * A StreamExecutionEnvironment that executes its jobs on a test cluster. + */ +public class TestStreamEnvironment extends StreamExecutionEnvironment { + + /** The mini cluster in which this environment executes its jobs */ + private ForkableFlinkMiniCluster executor; + + + public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) { + this.executor = Preconditions.checkNotNull(executor); + setParallelism(parallelism); + } + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + final StreamGraph streamGraph = getStreamGraph(); + streamGraph.setJobName(jobName); + final JobGraph jobGraph = streamGraph.getJobGraph(); + return executor.submitJobAndWait(jobGraph, false); + } + + // ------------------------------------------------------------------------ + + /** + * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on + * the given cluster with the given default parallelism. + * + * @param cluster The test cluster to run the test program on. + * @param parallelism The default parallelism for the test programs. + */ + public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) { + + StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() { + @Override + public StreamExecutionEnvironment createExecutionEnvironment() { + return new TestStreamEnvironment(cluster, parallelism); + } + }; + + initializeContextEnvironment(factory); + } + + /** + * Resets the streaming context environment to null. + */ + public static void unsetAsContext() { + resetContextEnvironment(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 9fd8c3e..77216e0 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -542,7 +542,7 @@ under the License. </goals> </pluginExecutionFilter> <action> - <ignore></ignore> + <ignore/> </action> </pluginExecution> </pluginExecutions> http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java new file mode 100644 index 0000000..5d99de4 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.util.MathUtils; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.List; + +public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath1; + private String resultPath2; + private String expected1; + private String expected2; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath1 = tempFolder.newFile().toURI().toString(); + resultPath2 = tempFolder.newFile().toURI().toString(); + expected1 = ""; + expected2 = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected1, resultPath1); + compareResultsByLinesInMemory(expected2, resultPath2); + } + + /** + * Tests the proper functioning of the streaming fold operator. For this purpose, a stream + * of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple + * value. Each group is folded where the second tuple value is summed up. + * + * This test relies on the hash function used by the {@link DataStream#keyBy}, which is + * assumed to be {@link MathUtils#murmurHash}. + */ + @Test + public void testGroupedFoldOperation() throws Exception { + int numElements = 10; + final int numKeys = 2; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys)); + + SplitStream<Tuple2<Integer, Integer>> splittedResult = sourceStream + .keyBy(0) + .fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() { + @Override + public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception { + return accumulator + value.f1; + } + }).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() { + int key = -1; + @Override + public Tuple2<Integer, Integer> map(Integer value) throws Exception { + if (key == -1){ + key = MathUtils.murmurHash(value) % numKeys; + } + return new Tuple2<>(key, value); + } + }).split(new OutputSelector<Tuple2<Integer, Integer>>() { + @Override + public Iterable<String> select(Tuple2<Integer, Integer> value) { + List<String> output = new ArrayList<>(); + + output.add(value.f0 + ""); + return output; + } + }); + + splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() { + @Override + public Integer map(Tuple2<Integer, Integer> value) throws Exception { + return value.f1; + } + }).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); + + splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() { + @Override + public Integer map(Tuple2<Integer, Integer> value) throws Exception { + return value.f1; + } + }).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE); + + StringBuilder builder1 = new StringBuilder(); + StringBuilder builder2 = new StringBuilder(); + int counter1 = 0; + int counter2 = 0; + + for (int i = 0; i < numElements; i++) { + if (MathUtils.murmurHash(i) % numKeys == 0) { + counter1 += i; + builder1.append(counter1 + "\n"); + } else { + counter2 += i; + builder2.append(counter2 + "\n"); + } + } + + expected1 = builder1.toString(); + expected2 = builder2.toString(); + + env.execute(); + } + + /** + * Tests whether the fold operation can also be called with non Java serializable types. + */ + @Test + public void testFoldOperationWithNonJavaSerializableType() throws Exception { + final int numElements = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements)); + + input + .keyBy(0) + .fold( + new NonSerializable(42), + new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() { + @Override + public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) throws Exception { + return new NonSerializable(accumulator.value + value.f1.value); + } + }) + .map(new MapFunction<NonSerializable, Integer>() { + @Override + public Integer map(NonSerializable value) throws Exception { + return value.value; + } + }) + .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE); + + StringBuilder builder = new StringBuilder(); + + for (int i = 0; i < numElements; i++) { + builder.append(42 + i + "\n"); + } + + expected1 = builder.toString(); + + env.execute(); + } + + private static class NonSerializable { + // This makes the type non-serializable + private final Object obj = new Object(); + + private final int value; + + public NonSerializable(int value) { + this.value = value; + } + } + + private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> { + private final int numElements; + + public NonSerializableTupleSource(int numElements) { + this.numElements = numElements; + } + + + @Override + public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, new NonSerializable(i))); + } + } + + @Override + public void cancel() {} + } + + private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> { + + private final int numElements; + private final int numKeys; + + public TupleSource(int numElements, int numKeys) { + this.numElements = numElements; + this.numKeys = numKeys; + } + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + // keys '1' and '2' hash to different buckets + Tuple2<Integer, Integer> result = new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i); + ctx.collect(result); + } + } + + @Override + public void cancel() { + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java new file mode 100644 index 0000000..c2155ac --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.outputformat; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.util.Collector; + +public class CsvOutputFormatITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> text = env.fromElements(WordCountData.TEXT); + + DataStream<Tuple2<String, Integer>> counts = text + .flatMap(new Tokenizer()) + .keyBy(0).sum(1); + + counts.writeAsCsv(resultPath); + + env.execute("WriteAsCsvTest"); + } + + @Override + protected void postSubmit() throws Exception { + //Strip the parentheses from the expected text like output + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES + .replaceAll("[\\\\(\\\\)]", ""), resultPath); + } + + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) + throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java new file mode 100644 index 0000000..2940e6d --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.outputformat; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.test.testdata.WordCountData; + +public class TextOutputFormatITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> text = env.fromElements(WordCountData.TEXT); + + DataStream<Tuple2<String, Integer>> counts = text + .flatMap(new CsvOutputFormatITCase.Tokenizer()) + .keyBy(0).sum(1); + + counts.writeAsText(resultPath); + + env.execute("WriteAsTextTest"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java new file mode 100644 index 0000000..d21985b --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.runtime; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.Test; + +import static org.junit.Assert.assertNotEquals; + +@SuppressWarnings("serial") +public class ChainedRuntimeContextITCase extends StreamingMultipleProgramsTestBase { + private static RuntimeContext srcContext; + private static RuntimeContext mapContext; + + @Test + public void test() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + env.addSource(new TestSource()).map(new TestMap()).addSink(new DiscardingSink<Integer>()); + env.execute(); + + assertNotEquals(srcContext, mapContext); + + } + + private static class TestSource extends RichParallelSourceFunction<Integer> { + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + } + + @Override + public void cancel() { + } + + @Override + public void open(Configuration c) { + srcContext = getRuntimeContext(); + } + + } + + private static class TestMap extends RichMapFunction<Integer, Integer> { + + @Override + public Integer map(Integer value) throws Exception { + return value; + } + + @Override + public void open(Configuration c) { + mapContext = getRuntimeContext(); + } + + } + +}