http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
deleted file mode 100644
index 6c1d0e6..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.timestamp;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for timestamps, watermarks, and event-time sources.
- */
-@SuppressWarnings("serial")
-public class TimestampITCase extends TestLogger {
-
-       private static final int NUM_TASK_MANAGERS = 2;
-       private static final int NUM_TASK_SLOTS = 3;
-       private static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
-
-       // this is used in some tests to synchronize
-       static MultiShotLatch latch;
-
-
-       private static ForkableFlinkMiniCluster cluster;
-
-       @Before
-       public void setupLatch() {
-               // ensure that we get a fresh latch for each test
-               latch = new MultiShotLatch();
-       }
-
-
-       @BeforeClass
-       public static void startCluster() {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
-                       cluster = new ForkableFlinkMiniCluster(config, false);
-
-                       cluster.start();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail("Failed to start test cluster: " + e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void shutdownCluster() {
-               try {
-                       cluster.shutdown();
-                       cluster = null;
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail("Failed to stop test cluster: " + e.getMessage());
-               }
-       }
-
-       /**
-        * These check whether custom timestamp emission works at sources and 
also whether timestamps
-        * arrive at operators throughout a topology.
-        *
-        * <p>
-        * This also checks whether watermarks keep propagating if a source 
closes early.
-        *
-        * <p>
-        * This only uses map to test the workings of watermarks in a complete, 
running topology. All
-        * tasks and stream operators have dedicated tests that test the 
watermark propagation
-        * behaviour.
-        */
-       @Test
-       public void testWatermarkPropagation() throws Exception {
-               final int NUM_WATERMARKS = 10;
-
-               long initialTime = 0L;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
-               
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.setParallelism(PARALLELISM);
-               env.getConfig().disableSysoutLogging();
-
-               DataStream<Integer> source1 = env.addSource(new 
MyTimestampSource(initialTime, NUM_WATERMARKS));
-               DataStream<Integer> source2 = env.addSource(new 
MyTimestampSource(initialTime, NUM_WATERMARKS / 2));
-
-               source1.union(source2)
-                               .map(new IdentityMap())
-                               .connect(source2).map(new IdentityCoMap())
-                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-                               .addSink(new NoOpSink<Integer>());
-
-               env.execute();
-
-               // verify that all the watermarks arrived at the final custom 
operator
-               for (int i = 0; i < PARALLELISM; i++) {
-                       // we are only guaranteed to see NUM_WATERMARKS / 2 
watermarks because the
-                       // other source stops emitting after that
-                       for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
-                               if 
(!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + 
j))) {
-                                       System.err.println("All Watermarks: ");
-                                       for (int k = 0; k <= NUM_WATERMARKS / 
2; k++) {
-                                               
System.err.println(CustomOperator.finalWatermarks[i].get(k));
-                                       }
-
-                                       fail("Wrong watermark.");
-                               }
-                       }
-                       
-                       assertEquals(Watermark.MAX_WATERMARK,
-                                       
CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
-               }
-       }
-
-       @Test
-       public void testWatermarkPropagationNoFinalWatermarkOnStop() throws 
Exception {
-               
-               // for this test to work, we need to be sure that no other jobs 
are being executed
-               while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-                       Thread.sleep(100);
-               }
-               
-               final int NUM_WATERMARKS = 10;
-
-               long initialTime = 0L;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
-
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.setParallelism(PARALLELISM);
-               env.getConfig().disableSysoutLogging();
-
-               DataStream<Integer> source1 = env.addSource(new 
MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS));
-               DataStream<Integer> source2 = env.addSource(new 
MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS / 2));
-
-               source1.union(source2)
-                               .map(new IdentityMap())
-                               .connect(source2).map(new IdentityCoMap())
-                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-                               .addSink(new NoOpSink<Integer>());
-
-               new Thread("stopper") {
-                       @Override
-                       public void run() {
-                               try {
-                                       // try until we get the running jobs
-                                       List<JobID> running;
-                                       while ((running = 
cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
-                                               Thread.sleep(50);
-                                       }
-
-                                       JobID id = running.get(0);
-                                       
-                                       // send stop until the job is stopped
-                                       do {
-                                               cluster.stopJob(id);
-                                               Thread.sleep(50);
-                                       } while 
(!cluster.getCurrentlyRunningJobsJava().isEmpty());
-                               }
-                               catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
-                       }
-               }.start();
-               
-               env.execute();
-
-               // verify that all the watermarks arrived at the final custom 
operator
-               for (List<Watermark> subtaskWatermarks : 
CustomOperator.finalWatermarks) {
-                       
-                       // we are only guaranteed to see NUM_WATERMARKS / 2 
watermarks because the
-                       // other source stops emitting after that
-                       for (int j = 0; j < subtaskWatermarks.size(); j++) {
-                               if (subtaskWatermarks.get(j).getTimestamp() != 
initialTime + j) {
-                                       System.err.println("All Watermarks: ");
-                                       for (int k = 0; k <= NUM_WATERMARKS / 
2; k++) {
-                                               
System.err.println(subtaskWatermarks.get(k));
-                                       }
-
-                                       fail("Wrong watermark.");
-                               }
-                       }
-                       
-                       // if there are watermarks, the final one must not be 
the MAX watermark
-                       if (subtaskWatermarks.size() > 0) {
-                               assertNotEquals(Watermark.MAX_WATERMARK,
-                                               
subtaskWatermarks.get(subtaskWatermarks.size()-1));
-                       }
-               }
-       }
-
-       /**
-        * These check whether timestamps are properly assigned at the sources 
and handled in
-        * network transmission and between chained operators when timestamps 
are enabled.
-        */
-       @Test
-       public void testTimestampHandling() throws Exception {
-               final int NUM_ELEMENTS = 10;
-
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
-
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.setParallelism(PARALLELISM);
-               env.getConfig().disableSysoutLogging();
-
-               DataStream<Integer> source1 = env.addSource(new 
MyTimestampSource(0L, NUM_ELEMENTS));
-               DataStream<Integer> source2 = env.addSource(new 
MyTimestampSource(0L, NUM_ELEMENTS));
-
-               source1
-                               .map(new IdentityMap())
-                               .connect(source2).map(new IdentityCoMap())
-                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
-                               .addSink(new NoOpSink<Integer>());
-
-
-               env.execute();
-       }
-
-       /**
-        * These check whether timestamps are properly ignored when they are 
disabled.
-        */
-       @Test
-       public void testDisabledTimestamps() throws Exception {
-               final int NUM_ELEMENTS = 10;
-               
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
-               
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-               env.setParallelism(PARALLELISM);
-               env.getConfig().disableSysoutLogging();
-               
-               DataStream<Integer> source1 = env.addSource(new 
MyNonWatermarkingSource(NUM_ELEMENTS));
-               DataStream<Integer> source2 = env.addSource(new 
MyNonWatermarkingSource(NUM_ELEMENTS));
-
-               source1
-                               .map(new IdentityMap())
-                               .connect(source2).map(new IdentityCoMap())
-                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
-                               .addSink(new NoOpSink<Integer>());
-               
-               env.execute();
-       }
-
-       /**
-        * This tests whether timestamps are properly extracted in the timestamp
-        * extractor and whether watermarks are also correctly forwared from 
this with the auto watermark
-        * interval.
-        */
-       @Test
-       public void testTimestampExtractorWithAutoInterval() throws Exception {
-               final int NUM_ELEMENTS = 10;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
-
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.getConfig().setAutoWatermarkInterval(10);
-               env.setParallelism(1);
-               env.getConfig().disableSysoutLogging();
-
-
-               DataStream<Integer> source1 = env.addSource(new 
SourceFunction<Integer>() {
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                               int index = 1;
-                               while (index <= NUM_ELEMENTS) {
-                                       ctx.collect(index);
-                                       latch.await();
-                                       index++;
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {}
-               });
-
-               DataStream<Integer> extractOp = 
source1.assignTimestampsAndWatermarks(
-                               new AscendingTimestampExtractor<Integer>() {
-                                       @Override
-                                       public long 
extractAscendingTimestamp(Integer element) {
-                                               return element;
-                                       }
-                               });
-
-               extractOp
-                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-                               .transform("Timestamp Check",
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               new 
TimestampCheckingOperator());
-
-               // verify that extractor picks up source parallelism
-               
Assert.assertEquals(extractOp.getTransformation().getParallelism(), 
source1.getTransformation().getParallelism());
-
-               env.execute();
-
-               // verify that we get NUM_ELEMENTS watermarks
-               for (int j = 0; j < NUM_ELEMENTS; j++) {
-                       if 
(!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
-                               long wm = 
CustomOperator.finalWatermarks[0].get(j).getTimestamp();
-                               Assert.fail("Wrong watermark. Expected: " + j + 
" Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
-                       }
-               }
-               
-               // the input is finite, so it should have a MAX Watermark
-               assertEquals(Watermark.MAX_WATERMARK, 
-                               
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() 
- 1));
-       }
-
-       /**
-        * This thests whether timestamps are properly extracted in the 
timestamp
-        * extractor and whether watermark are correctly forwarded from the 
custom watermark emit
-        * function.
-        */
-       @Test
-       public void testTimestampExtractorWithCustomWatermarkEmit() throws 
Exception {
-               final int NUM_ELEMENTS = 10;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
-
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.getConfig().setAutoWatermarkInterval(10);
-               env.setParallelism(1);
-               env.getConfig().disableSysoutLogging();
-
-
-               DataStream<Integer> source1 = env.addSource(new 
SourceFunction<Integer>() {
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                               int index = 1;
-                               while (index <= NUM_ELEMENTS) {
-                                       ctx.collect(index);
-                                       latch.await();
-                                       index++;
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {}
-               });
-
-               source1
-                               .assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Integer>() {
-                                       
-                                       @Override
-                                       public long extractTimestamp(Integer 
element, long currentTimestamp) {
-                                               return element;
-                                       }
-
-                                       @Override
-                                       public Watermark 
checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
-                                               return new 
Watermark(extractedTimestamp - 1);
-                                       }
-                               })
-                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-                               .transform("Timestamp Check", 
BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
-
-
-               env.execute();
-
-               // verify that we get NUM_ELEMENTS watermarks
-               for (int j = 0; j < NUM_ELEMENTS; j++) {
-                       if 
(!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
-                               Assert.fail("Wrong watermark.");
-                       }
-               }
-
-               // the input is finite, so it should have a MAX Watermark
-               assertEquals(Watermark.MAX_WATERMARK,
-                               
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() 
- 1));
-       }
-
-       /**
-        * This test verifies that the timestamp extractor does not emit 
decreasing watermarks even
-        *
-        */
-       @Test
-       public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() 
throws Exception {
-               final int NUM_ELEMENTS = 10;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
-
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.getConfig().setAutoWatermarkInterval(1);
-               env.setParallelism(1);
-               env.getConfig().disableSysoutLogging();
-
-
-               DataStream<Integer> source1 = env.addSource(new 
SourceFunction<Integer>() {
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                               int index = 1;
-                               while (index <= NUM_ELEMENTS) {
-                                       ctx.collect(index);
-                                       Thread.sleep(100);
-                                       ctx.collect(index - 1);
-                                       latch.await();
-                                       index++;
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {}
-               });
-
-               source1
-                               .assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Integer>() {
-
-                                       @Override
-                                       public long extractTimestamp(Integer 
element, long previousTimestamp) {
-                                               return element;
-                                       }
-
-                                       @Override
-                                       public Watermark 
checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
-                                               return new 
Watermark(extractedTimestamp - 1);
-                                       }
-                               })
-                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
-                               .transform("Timestamp Check", 
BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
-
-
-               env.execute();
-
-               // verify that we get NUM_ELEMENTS watermarks
-               for (int j = 0; j < NUM_ELEMENTS; j++) {
-                       if 
(!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
-                               Assert.fail("Wrong watermark.");
-                       }
-               }
-               // the input is finite, so it should have a MAX Watermark
-               assertEquals(Watermark.MAX_WATERMARK,
-                               
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() 
- 1));
-       }
-
-       /**
-        * This test verifies that the timestamp extractor forwards 
Long.MAX_VALUE watermarks.
-        */
-       @Test
-       public void testTimestampExtractorWithLongMaxWatermarkFromSource() 
throws Exception {
-               final int NUM_ELEMENTS = 10;
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
-
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.getConfig().setAutoWatermarkInterval(1);
-               env.setParallelism(2);
-               env.getConfig().disableSysoutLogging();
-
-
-               DataStream<Integer> source1 = env.addSource(new 
SourceFunction<Integer>() {
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                               int index = 1;
-                               while (index <= NUM_ELEMENTS) {
-                                       ctx.collectWithTimestamp(index, index);
-                                       ctx.collectWithTimestamp(index - 1, 
index - 1);
-                                       index++;
-                                       ctx.emitWatermark(new 
Watermark(index-2));
-                               }
-
-                               // emit the final Long.MAX_VALUE watermark, do 
it twice and verify that
-                               // we only see one in the result
-                               ctx.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                               ctx.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                       }
-
-                       @Override
-                       public void cancel() {}
-               });
-
-               source1
-                               .assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Integer>() {
-
-                                       @Override
-                                       public long extractTimestamp(Integer 
element, long currentTimestamp) {
-                                               return element;
-                                       }
-
-                                       @Override
-                                       public Watermark 
checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
-                                               return null;
-                                       }
-                               })
-                       .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
-
-
-               env.execute();
-
-               Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 
1);
-               
Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == 
Long.MAX_VALUE);
-       }
-
-       /**
-        * This test verifies that the timestamp extractor forwards 
Long.MAX_VALUE watermarks.
-        * 
-        * Same test as before, but using a different timestamp extractor
-        */
-       @Test
-       public void testTimestampExtractorWithLongMaxWatermarkFromSource2() 
throws Exception {
-               final int NUM_ELEMENTS = 10;
-
-               StreamExecutionEnvironment env = StreamExecutionEnvironment
-                               .createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
-
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.getConfig().setAutoWatermarkInterval(10);
-               env.setParallelism(2);
-               env.getConfig().disableSysoutLogging();
-
-               DataStream<Integer> source1 = env.addSource(new 
SourceFunction<Integer>() {
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                               int index = 1;
-                               while (index <= NUM_ELEMENTS) {
-                                       ctx.collectWithTimestamp(index, index);
-                                       ctx.collectWithTimestamp(index - 1, 
index - 1);
-                                       index++;
-                                       ctx.emitWatermark(new 
Watermark(index-2));
-                               }
-
-                               // emit the final Long.MAX_VALUE watermark, do 
it twice and verify that
-                               // we only see one in the result
-                               ctx.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                               ctx.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                       }
-
-                       @Override
-                       public void cancel() {}
-               });
-
-               source1
-                               .assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks<Integer>() {
-
-                                       @Override
-                                       public long extractTimestamp(Integer 
element, long currentTimestamp) {
-                                               return element;
-                                       }
-
-                                       @Override
-                                       public Watermark getCurrentWatermark() {
-                                               return null;
-                                       }
-                               })
-                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
-               
-               env.execute();
-
-               Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 
1);
-               
Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == 
Long.MAX_VALUE);
-       }
-
-       /**
-        * This verifies that an event time source works when setting stream 
time characteristic to
-        * processing time. In this case, the watermarks should just be 
swallowed.
-        */
-       @Test
-       public void testEventTimeSourceWithProcessingTime() throws Exception {
-               StreamExecutionEnvironment env = 
-                               
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
-               
-               env.setParallelism(2);
-               env.getConfig().disableSysoutLogging();
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-               DataStream<Integer> source1 = env.addSource(new 
MyTimestampSource(0, 10));
-
-               source1
-                       .map(new IdentityMap())
-                       .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false));
-
-               env.execute();
-
-               // verify that we don't get any watermarks, the source is used 
as watermark source in
-               // other tests, so it normally emits watermarks
-               Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 
0);
-       }
-       
-       @Test
-       public void testErrorOnEventTimeOverProcessingTime() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
-
-               env.setParallelism(2);
-               env.getConfig().disableSysoutLogging();
-               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-               DataStream<Tuple2<String, Integer>> source1 = 
-                               env.fromElements(new Tuple2<>("a", 1), new 
Tuple2<>("b", 2));
-
-               source1
-                               .keyBy(0)
-                               
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
-                               .reduce(new ReduceFunction<Tuple2<String, 
Integer>>() {
-                                       @Override
-                                       public Tuple2<String, Integer> 
reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
-                                               return value1;
-                                       }
-                               })
-                               .print();
-
-               try {
-                       env.execute();
-                       fail("this should fail with an exception");
-               } catch (Exception e) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testErrorOnEventTimeWithoutTimestamps() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                               "localhost", cluster.getLeaderRPCPort());
-
-               env.setParallelism(2);
-               env.getConfig().disableSysoutLogging();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-               DataStream<Tuple2<String, Integer>> source1 =
-                               env.fromElements(new Tuple2<>("a", 1), new 
Tuple2<>("b", 2));
-
-               source1
-                               .keyBy(0)
-                               
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
-                               .reduce(new ReduceFunction<Tuple2<String, 
Integer>>() {
-                                       @Override
-                                       public Tuple2<String, Integer> 
reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
-                                               return value1;
-                                       }
-                               })
-                               .print();
-
-               try {
-                       env.execute();
-                       fail("this should fail with an exception");
-               } catch (Exception e) {
-                       // expected
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Custom Operators and Functions
-       // 
------------------------------------------------------------------------
-       
-       @SuppressWarnings("unchecked")
-       public static class CustomOperator extends 
AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, 
Integer> {
-
-               List<Watermark> watermarks;
-               public static List<Watermark>[] finalWatermarks = new 
List[PARALLELISM];
-               private final boolean timestampsEnabled;
-
-               public CustomOperator(boolean timestampsEnabled) {
-                       setChainingStrategy(ChainingStrategy.ALWAYS);
-                       this.timestampsEnabled = timestampsEnabled;
-               }
-
-               @Override
-               public void processElement(StreamRecord<Integer> element) 
throws Exception {
-                       if (timestampsEnabled) {
-                               if (element.getTimestamp() != 
element.getValue()) {
-                                       Assert.fail("Timestamps are not 
properly handled.");
-                               }
-                       }
-                       output.collect(element);
-               }
-
-               @Override
-               public void processWatermark(Watermark mark) throws Exception {
-                       for (Watermark previousMark: watermarks) {
-                               assertTrue(previousMark.getTimestamp() < 
mark.getTimestamp());
-                       }
-                       watermarks.add(mark);
-                       latch.trigger();
-                       output.emitWatermark(mark);
-               }
-
-               @Override
-               public void open() throws Exception {
-                       super.open();
-                       watermarks = new ArrayList<>();
-               }
-
-               @Override
-               public void close() throws Exception {
-                       super.close();
-                       
finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks;
-               }
-       }
-
-       public static class TimestampCheckingOperator extends 
AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, 
Integer> {
-
-               public TimestampCheckingOperator() {
-                       setChainingStrategy(ChainingStrategy.ALWAYS);
-               }
-
-               @Override
-               public void processElement(StreamRecord<Integer> element) 
throws Exception {
-                       if (element.getTimestamp() != element.getValue()) {
-                               Assert.fail("Timestamps are not properly 
handled.");
-                       }
-                       output.collect(element);
-               }
-
-               @Override
-               public void processWatermark(Watermark mark) throws Exception {}
-       }
-
-       public static class DisabledTimestampCheckingOperator extends 
AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, 
Integer> {
-
-               @Override
-               public void processElement(StreamRecord<Integer> element) 
throws Exception {
-                       if (element.hasTimestamp()) {
-                               Assert.fail("Timestamps are not properly 
handled.");
-                       }
-                       output.collect(element);
-               }
-
-               @Override
-               public void processWatermark(Watermark mark) throws Exception {}
-       }
-
-       public static class IdentityCoMap implements CoMapFunction<Integer, 
Integer, Integer> {
-               @Override
-               public Integer map1(Integer value) throws Exception {
-                       return value;
-               }
-
-               @Override
-               public Integer map2(Integer value) throws Exception {
-                       return value;
-               }
-       }
-
-       public static class IdentityMap implements MapFunction<Integer, 
Integer> {
-               @Override
-               public Integer map(Integer value) throws Exception {
-                       return value;
-               }
-       }
-
-       public static class MyTimestampSource implements 
SourceFunction<Integer> {
-
-               private final long initialTime;
-               private final int numWatermarks;
-
-               public MyTimestampSource(long initialTime, int numWatermarks) {
-                       this.initialTime = initialTime;
-                       this.numWatermarks = numWatermarks;
-               }
-
-               @Override
-               public void run(SourceContext<Integer> ctx) throws Exception {
-                       for (int i = 0; i < numWatermarks; i++) {
-                               ctx.collectWithTimestamp(i, initialTime + i);
-                               ctx.emitWatermark(new Watermark(initialTime + 
i));
-                       }
-               }
-
-               @Override
-               public void cancel() {}
-       }
-
-       public static class MyTimestampSourceInfinite implements 
SourceFunction<Integer>, StoppableFunction {
-
-               private final long initialTime;
-               private final int numWatermarks;
-
-               private volatile boolean running = true;
-               
-               public MyTimestampSourceInfinite(long initialTime, int 
numWatermarks) {
-                       this.initialTime = initialTime;
-                       this.numWatermarks = numWatermarks;
-               }
-
-               @Override
-               public void run(SourceContext<Integer> ctx) throws Exception {
-                       for (int i = 0; i < numWatermarks; i++) {
-                               ctx.collectWithTimestamp(i, initialTime + i);
-                               ctx.emitWatermark(new Watermark(initialTime + 
i));
-                       }
-                       
-                       while (running) {
-                               Thread.sleep(20);
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-
-               @Override
-               public void stop() {
-                       running = false;
-               }
-       }
-
-       public static class MyNonWatermarkingSource implements 
SourceFunction<Integer> {
-
-               int numWatermarks;
-
-               public MyNonWatermarkingSource(int numWatermarks) {
-                       this.numWatermarks = numWatermarks;
-               }
-
-               @Override
-               public void run(SourceContext<Integer> ctx) throws Exception {
-                       for (int i = 0; i < numWatermarks; i++) {
-                               ctx.collect(i);
-                       }
-               }
-
-               @Override
-               public void cancel() {}
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
deleted file mode 100644
index d398121..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-public final class NoOpSink<T> extends RichSinkFunction<T> {
-       public void invoke(T tuple) {
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
deleted file mode 100644
index a46ff55..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
-       private List<T> received;
-
-       public void invoke(T tuple) {
-               received.add(tuple);
-       }
-
-       public void open(Configuration conf) {
-               received = new ArrayList<T>();
-       }
-
-       public void close() {
-               assertTrue(received.size() > 0);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
deleted file mode 100644
index 7d6a6d0..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Test base for streaming programs relying on an open server socket to write 
to.
- */
-public abstract class SocketOutputTestBase extends StreamingProgramTestBase {
-
-       protected static final String HOST = "localhost";
-       protected static Integer port;
-       protected Set<String> dataReadFromSocket = new HashSet<String>();
-
-       @Override
-       protected void preSubmit() throws Exception {
-               port = NetUtils.getAvailablePort();
-               temporarySocket = createLocalSocket(port);
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               Set<String> expectedData = new 
HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n")));
-               Assert.assertEquals(expectedData, dataReadFromSocket);
-               temporarySocket.close();
-       }
-
-       protected ServerSocket temporarySocket;
-
-       public ServerSocket createLocalSocket(int port) throws Exception {
-               ServerSocket serverSocket = new ServerSocket(port);
-               ServerThread st = new ServerThread(serverSocket);
-               st.start();
-               return serverSocket;
-       }
-
-       protected class ServerThread extends Thread {
-
-               private ServerSocket serverSocket;
-               private Thread t;
-
-               public ServerThread(ServerSocket serverSocket) {
-                       this.serverSocket = serverSocket;
-                       t = new Thread(this);
-               }
-
-               public void waitForAccept() throws Exception {
-                       Socket socket = serverSocket.accept();
-                       BufferedReader in = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
-                       DeserializationSchema<String> schema = new 
SimpleStringSchema();
-                       String rawData = in.readLine();
-                       while (rawData != null){
-                               String string = 
schema.deserialize(rawData.getBytes());
-                               dataReadFromSocket.add(string);
-                               rawData = in.readLine();
-                       }
-                       socket.close();
-               }
-
-               public void run() {
-                       try {
-                               waitForAccept();
-                       } catch (Exception e) {
-                               Assert.fail();
-                               throw new RuntimeException(e);
-                       }
-               }
-
-               @Override
-               public void start() {
-                       t.start();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
deleted file mode 100644
index 8cdedd5..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * Base class for streaming unit tests that run multiple tests and want to 
reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup 
and
- * shutdown of the Flink clusters (including actor systems, etc) usually 
dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the 
StreamExecutionEnvironment from
- * the context:
- *
- * <pre>
- *   {@literal @}Test
- *   public void someTest() {
- *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- *   {@literal @}Test
- *   public void anotherTest() {
- *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- * </pre>
- */
-public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
-
-       // 
------------------------------------------------------------------------
-       //  The mini cluster that is shared across tests
-       // 
------------------------------------------------------------------------
-
-       protected static final int DEFAULT_PARALLELISM = 4;
-
-       protected static ForkableFlinkMiniCluster cluster;
-
-       public StreamingMultipleProgramsTestBase() {
-               super(new Configuration());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Cluster setup & teardown
-       // 
------------------------------------------------------------------------
-
-       @BeforeClass
-       public static void setup() throws Exception {
-               cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, 
false, false, true);
-               TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
-       }
-
-       @AfterClass
-       public static void teardown() throws Exception {
-               TestStreamEnvironment.unsetAsContext();
-               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
deleted file mode 100644
index 50ed1cf..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public abstract class StreamingProgramTestBase extends AbstractTestBase {
-
-       protected static final int DEFAULT_PARALLELISM = 4;
-
-       private int parallelism;
-       
-       
-       public StreamingProgramTestBase() {
-               super(new Configuration());
-               setParallelism(DEFAULT_PARALLELISM);
-       }
-
-
-       public void setParallelism(int parallelism) {
-               this.parallelism = parallelism;
-               setTaskManagerNumSlots(parallelism);
-       }
-       
-       public int getParallelism() {
-               return parallelism;
-       }
-       
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Methods to create the test program and for pre- and post- test work
-       // 
--------------------------------------------------------------------------------------------
-
-       protected abstract void testProgram() throws Exception;
-
-       protected void preSubmit() throws Exception {}
-       
-       protected void postSubmit() throws Exception {}
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Test entry point
-       // 
--------------------------------------------------------------------------------------------
-
-       @Test
-       public void testJob() throws Exception {
-               try {
-                       // pre-submit
-                       try {
-                               preSubmit();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               fail("Pre-submit work caused an error: " + 
e.getMessage());
-                       }
-
-                       // prepare the test environment
-                       startCluster();
-
-                       TestStreamEnvironment.setAsContext(this.executor, 
getParallelism());
-
-                       // call the test program
-                       try {
-                               testProgram();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               fail("Error while calling the test program: " + 
e.getMessage());
-                       }
-                       finally {
-                               TestStreamEnvironment.unsetAsContext();
-                       }
-
-                       // post-submit
-                       try {
-                               postSubmit();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               fail("Post-submit work caused an error: " + 
e.getMessage());
-                       }
-               }
-               finally {
-                       stopCluster();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
deleted file mode 100644
index 423d08e..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TreeSet;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-public class TestListResultSink<T> extends RichSinkFunction<T> {
-
-       private static final long serialVersionUID = 1L;
-       private int resultListId;
-
-       public TestListResultSink() {
-               this.resultListId = TestListWrapper.getInstance().createList();
-       }
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-       }
-
-       @Override
-       public void invoke(T value) throws Exception {
-               synchronized (resultList()) {
-                       resultList().add(value);
-               }
-       }
-
-       @Override
-       public void close() throws Exception {
-               super.close();
-       }
-
-       @SuppressWarnings("unchecked")
-       private List<T> resultList() {
-               synchronized (TestListWrapper.getInstance()) {
-                       return (List<T>) 
TestListWrapper.getInstance().getList(resultListId);
-               }
-       }
-
-       public List<T> getResult() {
-               synchronized (resultList()) {
-                       ArrayList<T> copiedList = new 
ArrayList<T>(resultList());
-                       return copiedList;
-               }
-       }
-
-       public List<T> getSortedResult() {
-               synchronized (resultList()) {
-                       TreeSet<T> treeSet = new TreeSet<T>(resultList());
-                       ArrayList<T> sortedList = new ArrayList<T>(treeSet);
-                       return sortedList;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
deleted file mode 100644
index 751f836..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class TestListWrapper {
-
-       private static TestListWrapper instance;
-
-       @SuppressWarnings("rawtypes")
-       private List<List<? extends Comparable>> lists;
-
-       @SuppressWarnings("rawtypes")
-       private TestListWrapper() {
-               lists = Collections.synchronizedList(new ArrayList<List<? 
extends Comparable>>());
-       }
-
-       public static TestListWrapper getInstance() {
-               if (instance == null) {
-                       instance = new TestListWrapper();
-               }
-               return instance;
-       }
-
-       /**
-        * Creates and stores a list, returns with the id.
-        *
-        * @return The ID of the list.
-        */
-       @SuppressWarnings("rawtypes")
-       public int createList() {
-               lists.add(new ArrayList<Comparable>());
-               return lists.size() - 1;
-       }
-
-       public List<?> getList(int listId) {
-               @SuppressWarnings("rawtypes")
-               List<? extends Comparable> list = lists.get(listId);
-               if (list == null) {
-                       throw new RuntimeException("No such list.");
-               }
-
-               return list;
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
deleted file mode 100644
index c700102..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.Preconditions;
-
-/**
- * A StreamExecutionEnvironment that executes its jobs on a test cluster.
- */
-public class TestStreamEnvironment extends StreamExecutionEnvironment {
-       
-       /** The mini cluster in which this environment executes its jobs */
-       private ForkableFlinkMiniCluster executor;
-       
-
-       public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism) {
-               this.executor = Preconditions.checkNotNull(executor);
-               setParallelism(parallelism);
-       }
-       
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               final StreamGraph streamGraph = getStreamGraph();
-               streamGraph.setJobName(jobName);
-               final JobGraph jobGraph = streamGraph.getJobGraph();
-               return executor.submitJobAndWait(jobGraph, false);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Sets the streaming context environment to a TestStreamEnvironment 
that runs its programs on
-        * the given cluster with the given default parallelism.
-        * 
-        * @param cluster The test cluster to run the test program on.
-        * @param parallelism The default parallelism for the test programs.
-        */
-       public static void setAsContext(final ForkableFlinkMiniCluster cluster, 
final int parallelism) {
-               
-               StreamExecutionEnvironmentFactory factory = new 
StreamExecutionEnvironmentFactory() {
-                       @Override
-                       public StreamExecutionEnvironment 
createExecutionEnvironment() {
-                               return new TestStreamEnvironment(cluster, 
parallelism);
-                       }
-               };
-
-               initializeContextEnvironment(factory);
-       }
-
-       /**
-        * Resets the streaming context environment to null.
-        */
-       public static void unsetAsContext() {
-               resetContextEnvironment();
-       } 
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index ffbcb87..b82faf1 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -98,15 +98,6 @@ under the License.
                        <type>test-jar</type>
                </dependency>
 
-               <!-- To access streaming test utils -->
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
 
b/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
deleted file mode 100644
index 7b3ed67..0000000
--- 
a/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.api;
-
-import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
-import org.apache.flink.streaming.util.SocketOutputTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-import org.junit.Ignore;
-
-@Ignore
-//This test sometimes fails most likely due to the behaviour
-//of the socket. Disabled for now.
-public class SocketOutputFormatITCase extends SocketOutputTestBase {
-
-               @Override
-               protected void testProgram() throws Exception {
-                       
OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port);
-               }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml 
b/flink-test-utils-parent/flink-test-utils/pom.xml
index 3c35cf1..238c2da 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -58,6 +58,13 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>compile</scope>
+               </dependency>
+
+               <dependency>
                        <groupId>junit</groupId>
                        <artifactId>junit</artifactId>
                        <version>${junit.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
new file mode 100644
index 0000000..c5fbaf0
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Base class for streaming unit tests that run multiple tests and want to 
reuse the same
+ * Flink cluster. This saves a significant amount of time, since the startup 
and
+ * shutdown of the Flink clusters (including actor systems, etc) usually 
dominates
+ * the execution of the actual tests.
+ *
+ * To write a unit test against this test base, simply extend it and add
+ * one or more regular test methods and retrieve the 
StreamExecutionEnvironment from
+ * the context:
+ *
+ * <pre>
+ *   {@literal @}Test
+ *   public void someTest() {
+ *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ *   {@literal @}Test
+ *   public void anotherTest() {
+ *       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ *       // test code
+ *       env.execute();
+ *   }
+ *
+ * </pre>
+ */
+public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
+
+       // 
------------------------------------------------------------------------
+       //  The mini cluster that is shared across tests
+       // 
------------------------------------------------------------------------
+
+       protected static final int DEFAULT_PARALLELISM = 4;
+
+       protected static ForkableFlinkMiniCluster cluster;
+
+       public StreamingMultipleProgramsTestBase() {
+               super(new Configuration());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Cluster setup & teardown
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, 
false, false, true);
+               TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
+       }
+
+       @AfterClass
+       public static void teardown() throws Exception {
+               TestStreamEnvironment.unsetAsContext();
+               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
new file mode 100644
index 0000000..50ed1cf
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public abstract class StreamingProgramTestBase extends AbstractTestBase {
+
+       protected static final int DEFAULT_PARALLELISM = 4;
+
+       private int parallelism;
+       
+       
+       public StreamingProgramTestBase() {
+               super(new Configuration());
+               setParallelism(DEFAULT_PARALLELISM);
+       }
+
+
+       public void setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+               setTaskManagerNumSlots(parallelism);
+       }
+       
+       public int getParallelism() {
+               return parallelism;
+       }
+       
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Methods to create the test program and for pre- and post- test work
+       // 
--------------------------------------------------------------------------------------------
+
+       protected abstract void testProgram() throws Exception;
+
+       protected void preSubmit() throws Exception {}
+       
+       protected void postSubmit() throws Exception {}
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Test entry point
+       // 
--------------------------------------------------------------------------------------------
+
+       @Test
+       public void testJob() throws Exception {
+               try {
+                       // pre-submit
+                       try {
+                               preSubmit();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               fail("Pre-submit work caused an error: " + 
e.getMessage());
+                       }
+
+                       // prepare the test environment
+                       startCluster();
+
+                       TestStreamEnvironment.setAsContext(this.executor, 
getParallelism());
+
+                       // call the test program
+                       try {
+                               testProgram();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               fail("Error while calling the test program: " + 
e.getMessage());
+                       }
+                       finally {
+                               TestStreamEnvironment.unsetAsContext();
+                       }
+
+                       // post-submit
+                       try {
+                               postSubmit();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               fail("Post-submit work caused an error: " + 
e.getMessage());
+                       }
+               }
+               finally {
+                       stopCluster();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
new file mode 100644
index 0000000..c700102
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A StreamExecutionEnvironment that executes its jobs on a test cluster.
+ */
+public class TestStreamEnvironment extends StreamExecutionEnvironment {
+       
+       /** The mini cluster in which this environment executes its jobs */
+       private ForkableFlinkMiniCluster executor;
+       
+
+       public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism) {
+               this.executor = Preconditions.checkNotNull(executor);
+               setParallelism(parallelism);
+       }
+       
+       @Override
+       public JobExecutionResult execute(String jobName) throws Exception {
+               final StreamGraph streamGraph = getStreamGraph();
+               streamGraph.setJobName(jobName);
+               final JobGraph jobGraph = streamGraph.getJobGraph();
+               return executor.submitJobAndWait(jobGraph, false);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Sets the streaming context environment to a TestStreamEnvironment 
that runs its programs on
+        * the given cluster with the given default parallelism.
+        * 
+        * @param cluster The test cluster to run the test program on.
+        * @param parallelism The default parallelism for the test programs.
+        */
+       public static void setAsContext(final ForkableFlinkMiniCluster cluster, 
final int parallelism) {
+               
+               StreamExecutionEnvironmentFactory factory = new 
StreamExecutionEnvironmentFactory() {
+                       @Override
+                       public StreamExecutionEnvironment 
createExecutionEnvironment() {
+                               return new TestStreamEnvironment(cluster, 
parallelism);
+                       }
+               };
+
+               initializeContextEnvironment(factory);
+       }
+
+       /**
+        * Resets the streaming context environment to null.
+        */
+       public static void unsetAsContext() {
+               resetContextEnvironment();
+       } 
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 9fd8c3e..77216e0 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -542,7 +542,7 @@ under the License.
                                                                                
</goals>
                                                                        
</pluginExecutionFilter>
                                                                        <action>
-                                                                               
<ignore></ignore>
+                                                                               
<ignore/>
                                                                        
</action>
                                                                
</pluginExecution>
                                                        </pluginExecutions>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
new file mode 100644
index 0000000..5d99de4
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase {
+
+       private String resultPath1;
+       private String resultPath2;
+       private String expected1;
+       private String expected2;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception {
+               resultPath1 = tempFolder.newFile().toURI().toString();
+               resultPath2 = tempFolder.newFile().toURI().toString();
+               expected1 = "";
+               expected2 = "";
+       }
+
+       @After
+       public void after() throws Exception {
+               compareResultsByLinesInMemory(expected1, resultPath1);
+               compareResultsByLinesInMemory(expected2, resultPath2);
+       }
+
+       /**
+        * Tests the proper functioning of the streaming fold operator. For 
this purpose, a stream
+        * of Tuple2<Integer, Integer> is created. The stream is grouped 
according to the first tuple
+        * value. Each group is folded where the second tuple value is summed 
up.
+        *
+        * This test relies on the hash function used by the {@link 
DataStream#keyBy}, which is
+        * assumed to be {@link MathUtils#murmurHash}.
+        */
+       @Test
+       public void testGroupedFoldOperation() throws Exception {
+               int numElements = 10;
+               final int numKeys = 2;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStream<Tuple2<Integer, Integer>> sourceStream = 
env.addSource(new TupleSource(numElements, numKeys));
+
+               SplitStream<Tuple2<Integer, Integer>> splittedResult = 
sourceStream
+                       .keyBy(0)
+                       .fold(0, new FoldFunction<Tuple2<Integer, Integer>, 
Integer>() {
+                               @Override
+                               public Integer fold(Integer accumulator, 
Tuple2<Integer, Integer> value) throws Exception {
+                                       return accumulator + value.f1;
+                               }
+                       }).map(new RichMapFunction<Integer, Tuple2<Integer, 
Integer>>() {
+                               int key = -1;
+                               @Override
+                               public Tuple2<Integer, Integer> map(Integer 
value) throws Exception {
+                                       if (key == -1){
+                                               key = 
MathUtils.murmurHash(value) % numKeys;
+                                       }
+                                       return new Tuple2<>(key, value);
+                               }
+                       }).split(new OutputSelector<Tuple2<Integer, Integer>>() 
{
+                               @Override
+                               public Iterable<String> select(Tuple2<Integer, 
Integer> value) {
+                                       List<String> output = new ArrayList<>();
+
+                                       output.add(value.f0 + "");
+                                       return output;
+                               }
+                       });
+
+               splittedResult.select("0").map(new 
MapFunction<Tuple2<Integer,Integer>, Integer>() {
+                       @Override
+                       public Integer map(Tuple2<Integer, Integer> value) 
throws Exception {
+                               return value.f1;
+                       }
+               }).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+               splittedResult.select("1").map(new MapFunction<Tuple2<Integer, 
Integer>, Integer>() {
+                       @Override
+                       public Integer map(Tuple2<Integer, Integer> value) 
throws Exception {
+                               return value.f1;
+                       }
+               }).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+               StringBuilder builder1 = new StringBuilder();
+               StringBuilder builder2 = new StringBuilder();
+               int counter1 = 0;
+               int counter2 = 0;
+
+               for (int i = 0; i < numElements; i++) {
+                       if (MathUtils.murmurHash(i) % numKeys == 0) {
+                               counter1 += i;
+                               builder1.append(counter1 + "\n");
+                       } else {
+                               counter2 += i;
+                               builder2.append(counter2 + "\n");
+                       }
+               }
+
+               expected1 = builder1.toString();
+               expected2 = builder2.toString();
+
+               env.execute();
+       }
+
+       /**
+        * Tests whether the fold operation can also be called with non Java 
serializable types.
+        */
+       @Test
+       public void testFoldOperationWithNonJavaSerializableType() throws 
Exception {
+               final int numElements = 10;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStream<Tuple2<Integer, NonSerializable>> input = 
env.addSource(new NonSerializableTupleSource(numElements));
+
+               input
+                       .keyBy(0)
+                       .fold(
+                               new NonSerializable(42),
+                               new FoldFunction<Tuple2<Integer, 
NonSerializable>, NonSerializable>() {
+                                       @Override
+                                       public NonSerializable 
fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) 
throws Exception {
+                                               return new 
NonSerializable(accumulator.value + value.f1.value);
+                                       }
+                       })
+                       .map(new MapFunction<NonSerializable, Integer>() {
+                               @Override
+                               public Integer map(NonSerializable value) 
throws Exception {
+                                       return value.value;
+                               }
+                       })
+                       .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
+
+               StringBuilder builder = new StringBuilder();
+
+               for (int i = 0; i < numElements; i++) {
+                       builder.append(42 + i + "\n");
+               }
+
+               expected1 = builder.toString();
+
+               env.execute();
+       }
+
+       private static class NonSerializable {
+               // This makes the type non-serializable
+               private final Object obj = new Object();
+
+               private final int value;
+
+               public NonSerializable(int value) {
+                       this.value = value;
+               }
+       }
+
+       private static class NonSerializableTupleSource implements 
SourceFunction<Tuple2<Integer, NonSerializable>> {
+               private final int numElements;
+
+               public NonSerializableTupleSource(int numElements) {
+                       this.numElements = numElements;
+               }
+
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, NonSerializable>> 
ctx) throws Exception {
+                       for (int i = 0; i < numElements; i++) {
+                               ctx.collect(new Tuple2<>(i, new 
NonSerializable(i)));
+                       }
+               }
+
+               @Override
+               public void cancel() {}
+       }
+
+       private static class TupleSource implements 
SourceFunction<Tuple2<Integer, Integer>> {
+
+               private final int numElements;
+               private final int numKeys;
+
+               public TupleSource(int numElements, int numKeys) {
+                       this.numElements = numElements;
+                       this.numKeys = numKeys;
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
+                       for (int i = 0; i < numElements; i++) {
+                               // keys '1' and '2' hash to different buckets
+                               Tuple2<Integer, Integer> result = new 
Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i);
+                               ctx.collect(result);
+                       }
+               }
+
+               @Override
+               public void cancel() {
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
new file mode 100644
index 0000000..c2155ac
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.outputformat;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+               DataStream<Tuple2<String, Integer>> counts = text
+                               .flatMap(new Tokenizer())
+                               .keyBy(0).sum(1);
+
+               counts.writeAsCsv(resultPath);
+
+               env.execute("WriteAsCsvTest");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               //Strip the parentheses from the expected text like output
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
+                               .replaceAll("[\\\\(\\\\)]", ""), resultPath);
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out)
+                               throws Exception {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
new file mode 100644
index 0000000..2940e6d
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.outputformat;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class TextOutputFormatITCase extends StreamingProgramTestBase {
+
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+               DataStream<Tuple2<String, Integer>> counts = text
+                               .flatMap(new CsvOutputFormatITCase.Tokenizer())
+                               .keyBy(0).sum(1);
+
+               counts.writeAsText(resultPath);
+
+               env.execute("WriteAsTextTest");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
resultPath);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java
new file mode 100644
index 0000000..d21985b
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/ChainedRuntimeContextITCase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+@SuppressWarnings("serial")
+public class ChainedRuntimeContextITCase extends 
StreamingMultipleProgramsTestBase {
+       private static RuntimeContext srcContext;
+       private static RuntimeContext mapContext;
+
+       @Test
+       public void test() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+
+               env.addSource(new TestSource()).map(new TestMap()).addSink(new 
DiscardingSink<Integer>());
+               env.execute();
+
+               assertNotEquals(srcContext, mapContext);
+
+       }
+
+       private static class TestSource extends 
RichParallelSourceFunction<Integer> {
+
+               @Override
+               public void run(SourceContext<Integer> ctx) throws Exception {
+               }
+
+               @Override
+               public void cancel() {
+               }
+
+               @Override
+               public void open(Configuration c) {
+                       srcContext = getRuntimeContext();
+               }
+
+       }
+
+       private static class TestMap extends RichMapFunction<Integer, Integer> {
+
+               @Override
+               public Integer map(Integer value) throws Exception {
+                       return value;
+               }
+
+               @Override
+               public void open(Configuration c) {
+                       mapContext = getRuntimeContext();
+               }
+
+       }
+
+}

Reply via email to