http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java 
b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
index 6215f31..638eb5c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.types.IntValue;
+
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,13 +42,13 @@ import java.util.Random;
 
 import static org.hamcrest.Matchers.is;
 
-/*
+/**
  * These programs demonstrate the effects of user defined functions which 
modify input objects or return locally created
  * objects that are retained and reused on future calls. The programs do not 
retain and later modify input objects.
  */
 public class OverwriteObjects {
 
-       public final static Logger LOG = 
LoggerFactory.getLogger(OverwriteObjects.class);
+       public static final Logger LOG = 
LoggerFactory.getLogger(OverwriteObjects.class);
 
        // DataSets are created with this number of elements
        private static final int NUMBER_OF_ELEMENTS = 3_000_000;
@@ -71,7 +72,7 @@ public class OverwriteObjects {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.getConfig().disableSysoutLogging();
 
-               for (int parallelism = MAX_PARALLELISM ; parallelism > 0 ; 
parallelism--) {
+               for (int parallelism = MAX_PARALLELISM; parallelism > 0; 
parallelism--) {
                        LOG.info("Parallelism = {}", parallelism);
 
                        env.setParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java 
b/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
index c8604cb..46be968 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
@@ -36,7 +36,7 @@ import java.util.Random;
  * (See also 
http://peel-framework.org/2016/04/07/hash-aggregations-in-flink.html)
  */
 public class ReducePerformance {
-       
+
        public static void main(String[] args) throws Exception {
 
                final int numElements = 40_000_000;
@@ -120,7 +120,7 @@ public class ReducePerformance {
                        int rem = numElements % numPartitions;
                        SplittableRandomIterator<T, B>[] res = new 
SplittableRandomIterator[numPartitions];
                        for (int i = 0; i < numPartitions; i++) {
-                               res[i] = new SplittableRandomIterator<T, B>(i < 
rem ? splitSize : splitSize + 1, (B)baseIterator.copy());
+                               res[i] = new SplittableRandomIterator<T, B>(i < 
rem ? splitSize : splitSize + 1, (B) baseIterator.copy());
                        }
                        return res;
                }
@@ -140,7 +140,6 @@ public class ReducePerformance {
                CopyableIterator<T> copy();
        }
 
-
        private static final class TupleIntIntIterator implements 
CopyableIterator<Tuple2<Integer, Integer>>, Serializable {
 
                private final int keyRange;
@@ -183,7 +182,6 @@ public class ReducePerformance {
                }
        }
 
-
        private static final class TupleStringIntIterator implements 
CopyableIterator<Tuple2<String, Integer>>, Serializable {
 
                private final int keyRange;
@@ -226,7 +224,6 @@ public class ReducePerformance {
                }
        }
 
-
        private static final class SumReducer<K> implements 
ReduceFunction<Tuple2<K, Integer>> {
                @Override
                public Tuple2<K, Integer> reduce(Tuple2<K, Integer> a, 
Tuple2<K, Integer> b) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 90dbe80..c7f43fa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -31,24 +31,27 @@ import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 
 import static org.junit.Assert.fail;
 
+/**
+ * Manual test to evaluate impact of checkpointing on latency.
+ */
 public class StreamingScalabilityAndLatency {
-       
+
        public static void main(String[] args) throws Exception {
                if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) {
                        throw new RuntimeException("This test program needs to 
run with at least 5GB of heap space.");
                }
-               
-               final int TASK_MANAGERS = 1;
-               final int SLOTS_PER_TASK_MANAGER = 80;
-               final int PARALLELISM = TASK_MANAGERS * SLOTS_PER_TASK_MANAGER;
+
+               final int taskManagers = 1;
+               final int slotsPerTaskManager = 80;
+               final int parallelism = taskManagers * slotsPerTaskManager;
 
                LocalFlinkMiniCluster cluster = null;
 
                try {
                        Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, TASK_MANAGERS);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagers);
                        config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
SLOTS_PER_TASK_MANAGER);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slotsPerTaskManager);
                        
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
 
                        config.setInteger("taskmanager.net.server.numThreads", 
1);
@@ -56,8 +59,8 @@ public class StreamingScalabilityAndLatency {
 
                        cluster = new LocalFlinkMiniCluster(config, false);
                        cluster.start();
-                       
-                       runPartitioningProgram(cluster.getLeaderRPCPort(), 
PARALLELISM);
+
+                       runPartitioningProgram(cluster.getLeaderRPCPort(), 
parallelism);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -69,7 +72,7 @@ public class StreamingScalabilityAndLatency {
                        }
                }
        }
-       
+
        private static void runPartitioningProgram(int jobManagerPort, int 
parallelism) throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
                env.setParallelism(parallelism);
@@ -83,23 +86,22 @@ public class StreamingScalabilityAndLatency {
                        .map(new IdMapper<Tuple2<Long, Long>>())
                        .keyBy(0)
                        .addSink(new TimestampingSink());
-               
+
                env.execute("Partitioning Program");
        }
-       
-       public static class TimeStampingSource implements 
ParallelSourceFunction<Tuple2<Long, Long>> {
+
+       private static class TimeStampingSource implements 
ParallelSourceFunction<Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 
-151782334777482511L;
 
                private volatile boolean running = true;
-               
-               
+
                @Override
                public void run(SourceContext<Tuple2<Long, Long>> ctx) throws 
Exception {
-                       
+
                        long num = 100;
                        long counter = (long) (Math.random() * 4096);
-                       
+
                        while (running) {
                                if (num < 100) {
                                        num++;
@@ -119,14 +121,14 @@ public class StreamingScalabilityAndLatency {
                        running = false;
                }
        }
-       
-       public static class TimestampingSink implements 
SinkFunction<Tuple2<Long, Long>> {
+
+       private static class TimestampingSink implements 
SinkFunction<Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 
1876986644706201196L;
 
                private long maxLatency;
-               private long count; 
-               
+               private long count;
+
                @Override
                public void invoke(Tuple2<Long, Long> value) {
                        long ts = value.f1;
@@ -134,7 +136,7 @@ public class StreamingScalabilityAndLatency {
                                long diff = System.currentTimeMillis() - ts;
                                maxLatency = Math.max(diff, maxLatency);
                        }
-                       
+
                        count++;
                        if (count == 5000) {
                                System.out.println("Max latency: " + 
maxLatency);
@@ -144,7 +146,7 @@ public class StreamingScalabilityAndLatency {
                }
        }
 
-       public static class IdMapper<T> implements MapFunction<T, T> {
+       private static class IdMapper<T> implements MapFunction<T, T> {
 
                private static final long serialVersionUID = 
-6543809409233225099L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java 
b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
index 1c5744d..bd5123a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
@@ -21,4 +21,5 @@
  * need to be manually invoked, because they are extremely heavy, time 
intensive,
  * of require larger-than-usual JVMs.
  */
-package org.apache.flink.test.manual;
\ No newline at end of file
+
+package org.apache.flink.test.manual;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index eea2509..1fb5e65 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.test.misc;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -33,6 +30,7 @@ import 
org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -41,6 +39,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 /**
  * This test verifies that the auto parallelism is properly forwarded to the 
runtime.
  */
@@ -79,7 +80,6 @@ public class AutoParallelismITCase extends TestLogger {
                }
        }
 
-
        @Test
        public void testProgramWithAutoParallelism() {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
index 39a08d2..b8f1d80 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
@@ -24,8 +24,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.test.util.JavaProgramTestBase;
+
 import org.junit.Assert;
 
+/**
+ * Integration tests for custom {@link Partitioner}.
+ */
 @SuppressWarnings("serial")
 public class CustomPartitioningITCase extends JavaProgramTestBase {
 
@@ -36,17 +40,17 @@ public class CustomPartitioningITCase extends 
JavaProgramTestBase {
                if (!isCollectionExecution()) {
                        Assert.assertTrue(env.getParallelism() > 1);
                }
-               
+
                env.generateSequence(1, 1000)
                        .partitionCustom(new AllZeroPartitioner(), new 
IdKeySelector<Long>())
                        .map(new FailExceptInPartitionZeroMapper())
                        .output(new DiscardingOutputFormat<Long>());
-               
+
                env.execute();
        }
-       
-       public static class FailExceptInPartitionZeroMapper extends 
RichMapFunction<Long, Long> {
-               
+
+       private static class FailExceptInPartitionZeroMapper extends 
RichMapFunction<Long, Long> {
+
                @Override
                public Long map(Long value) throws Exception {
                        if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
@@ -56,15 +60,15 @@ public class CustomPartitioningITCase extends 
JavaProgramTestBase {
                        }
                }
        }
-       
-       public static class AllZeroPartitioner implements Partitioner<Long> {
+
+       private static class AllZeroPartitioner implements Partitioner<Long> {
                @Override
                public int partition(Long key, int numPartitions) {
                        return 0;
                }
        }
-       
-       public static class IdKeySelector<T> implements KeySelector<T, T> {
+
+       private static class IdKeySelector<T> implements KeySelector<T, T> {
                @Override
                public T getKey(T value) {
                        return value;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 76480ba..1532741 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.types.Value;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -41,11 +41,15 @@ import java.io.IOException;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Test for proper error messages in case user-defined serialization is broken
+ * and detected in the network stack.
+ */
 @SuppressWarnings("serial")
 public class CustomSerializationITCase extends TestLogger {
 
        private static final int PARLLELISM = 5;
-       
+
        private static LocalFlinkMiniCluster cluster;
 
        private static TestEnvironment env;
@@ -66,13 +70,13 @@ public class CustomSerializationITCase extends TestLogger {
                cluster.shutdown();
                cluster = null;
        }
-       
+
        @Test
        public void testIncorrectSerializer1() {
                try {
                        env.setParallelism(PARLLELISM);
                        env.getConfig().disableSysoutLogging();
-                       
+
                        env
                                .generateSequence(1, 10 * PARLLELISM)
                                .map(new MapFunction<Long, ConsumesTooMuch>() {
@@ -83,7 +87,7 @@ public class CustomSerializationITCase extends TestLogger {
                                })
                                .rebalance()
                                .output(new 
DiscardingOutputFormat<ConsumesTooMuch>());
-                       
+
                        env.execute();
                }
                catch (JobExecutionException e) {
@@ -186,11 +190,14 @@ public class CustomSerializationITCase extends TestLogger 
{
                        fail(e.getMessage());
                }
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Custom Data Types with broken Serialization Logic
        // 
------------------------------------------------------------------------
-       
+
+       /**
+        * {@link Value} reading more data than written.
+        */
        public static class ConsumesTooMuch implements Value {
 
                @Override
@@ -206,6 +213,9 @@ public class CustomSerializationITCase extends TestLogger {
                }
        }
 
+       /**
+        * {@link Value} reading more buffers than written.
+        */
        public static class ConsumesTooMuchSpanning implements Value {
 
                @Override
@@ -221,6 +231,9 @@ public class CustomSerializationITCase extends TestLogger {
                }
        }
 
+       /**
+        * {@link Value} reading less data than written.
+        */
        public static class ConsumesTooLittle implements Value {
 
                @Override
@@ -236,6 +249,9 @@ public class CustomSerializationITCase extends TestLogger {
                }
        }
 
+       /**
+        * {@link Value} reading fewer buffers than written.
+        */
        public static class ConsumesTooLittleSpanning implements Value {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java 
b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
index fa1fcb6..c004759 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
@@ -21,20 +21,23 @@ package org.apache.flink.test.misc;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Test TypeInfo serializer tree.
+ */
 public class GenericTypeInfoTest {
 
        @Test
        public void testSerializerTree() {
                @SuppressWarnings("unchecked")
-               TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> 
ti = 
-                               
(TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) 
+               TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> 
ti =
+                               
(TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>)
                                                
TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
-               
+
                String serTree = Utils.getSerializerTree(ti);
                // We can not test against the entire output because the fields 
of 'String' differ
                // between java versions
@@ -67,7 +70,7 @@ public class GenericTypeInfoTest {
                                "            lowestSetBit:int\n" +
                                "            firstNonzeroIntNum:int\n" +
                                "    mixed:java.util.List\n" +
-                               "    
makeMeGeneric:org.apache.flink.test.javaApiOperators.util.CollectionDataSets$PojoWithDateAndEnum\n"
 +
+                               "    
makeMeGeneric:org.apache.flink.test.operators.util.CollectionDataSets$PojoWithDateAndEnum\n"
 +
                                "        group:java.lang.String\n"));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 7dab0f1..00b4485 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -30,24 +30,26 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the system behavior in multiple corner cases
  *   - when null records are passed through the system.
  *   - when disjoint dataflows are executed
  *   - when accumulators are used chained after a non-udf operator.
- *   
- * The tests are bundled into one class to reuse the same test cluster. This 
speeds
+ *
+ * <p>The tests are bundled into one class to reuse the same test cluster. 
This speeds
  * up test execution, as the majority of the test time goes usually into 
starting/stopping the
  * test cluster.
  */
@@ -59,7 +61,7 @@ public class MiscellaneousIssuesITCase extends TestLogger {
        private static LocalFlinkMiniCluster cluster;
 
        private static TestEnvironment env;
-       
+
        @BeforeClass
        public static void startCluster() {
                Configuration config = new Configuration();
@@ -72,13 +74,13 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 
                env = new TestEnvironment(cluster, PARALLELISM, false);
        }
-       
+
        @AfterClass
        public static void shutdownCluster() {
                cluster.shutdown();
                cluster = null;
        }
-       
+
        @Test
        public void testNullValues() {
                try {
@@ -128,13 +130,13 @@ public class MiscellaneousIssuesITCase extends TestLogger 
{
 
        @Test
        public void testAccumulatorsAfterNoOp() {
-               
-               final String ACC_NAME = "test_accumulator";
-               
+
+               final String accName = "test_accumulator";
+
                try {
                        env.setParallelism(6);
                        env.getConfig().disableSysoutLogging();
-                       
+
                        env.generateSequence(1, 1000000)
                                        .rebalance()
                                        .flatMap(new RichFlatMapFunction<Long, 
Long>() {
@@ -143,7 +145,7 @@ public class MiscellaneousIssuesITCase extends TestLogger {
 
                                                @Override
                                                public void open(Configuration 
parameters) {
-                                                       counter = 
getRuntimeContext().getLongCounter(ACC_NAME);
+                                                       counter = 
getRuntimeContext().getLongCounter(accName);
                                                }
 
                                                @Override
@@ -154,8 +156,8 @@ public class MiscellaneousIssuesITCase extends TestLogger {
                                        .output(new 
DiscardingOutputFormat<Long>());
 
                        JobExecutionResult result = env.execute();
-                       
-                       assertEquals(1000000L, 
result.getAllAccumulatorResults().get(ACC_NAME));
+
+                       assertEquals(1000000L, 
result.getAllAccumulatorResults().get(accName));
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index a5103cc..fd556d5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,35 +32,41 @@ import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Test that runs an iterative job after a failure in another iterative job.
+ * This test validates that task slots in co-location constraints are properly
+ * freed in the presence of failures.
+ */
 public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 
        private static final int PARALLELISM = 16;
        @Test
        public void testSuccessfulProgramAfterFailure() {
                LocalFlinkMiniCluster cluster = null;
-               
+
                try {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                        config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
                        
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
-                       
+
                        cluster = new LocalFlinkMiniCluster(config, false);
 
                        cluster.start();
 
                        TestEnvironment env = new TestEnvironment(cluster, 
PARALLELISM, false);
-                       
+
                        try {
                                runConnectedComponents(env);
                        }
@@ -68,7 +74,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends 
TestLogger {
                                e.printStackTrace();
                                fail("Program Execution should have 
succeeded.");
                        }
-       
+
                        try {
                                runKMeans(env);
                                fail("This program execution should have 
failed.");
@@ -76,7 +82,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends 
TestLogger {
                        catch (JobExecutionException e) {
                                
assertTrue(e.getCause().getMessage().contains("Insufficient number of network 
buffers"));
                        }
-       
+
                        try {
                                runConnectedComponents(env);
                        }
@@ -95,9 +101,9 @@ public class SuccessAfterNetworkBuffersFailureITCase extends 
TestLogger {
                        }
                }
        }
-       
+
        private static void runConnectedComponents(ExecutionEnvironment env) 
throws Exception {
-               
+
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
 
@@ -166,7 +172,7 @@ public class SuccessAfterNetworkBuffersFailureITCase 
extends TestLogger {
                                .map(new 
KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
 
                clusteredPoints.output(new 
DiscardingOutputFormat<Tuple2<Integer, KMeans.Point>>());
-               
+
                env.execute("KMeans Example");
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
new file mode 100644
index 0000000..b4bd213
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.ValueCollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for aggregations.
+ */
+@RunWith(Parameterized.class)
+public class AggregateITCase extends MultipleProgramsTestBase {
+
+       public AggregateITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testFullAggregate() throws Exception {
+               /*
+                * Full Aggregate
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Integer, Long>> aggregateDs = ds
+                               .aggregate(Aggregations.SUM, 0)
+                               .and(Aggregations.MAX, 1)
+                               .project(0, 1);
+
+               List<Tuple2<Integer, Long>> result = aggregateDs.collect();
+
+               String expected = "231,6\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testFullAggregateOfMutableValueTypes() throws Exception {
+               /*
+                * Full Aggregate of mutable value types
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = 
ValueCollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds
+                               .aggregate(Aggregations.SUM, 0)
+                               .and(Aggregations.MAX, 1)
+                               .project(0, 1);
+
+               List<Tuple2<IntValue, LongValue>> result = 
aggregateDs.collect();
+
+               String expected = "231,6\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testGroupedAggregate() throws Exception {
+               /*
+                * Grouped Aggregate
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+                               .aggregate(Aggregations.SUM, 0)
+                               .project(1, 0);
+
+               List<Tuple2<Long, Integer>> result = aggregateDs.collect();
+
+               String expected = "1,1\n" +
+                               "2,5\n" +
+                               "3,15\n" +
+                               "4,34\n" +
+                               "5,65\n" +
+                               "6,111\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testGroupedAggregateOfMutableValueTypes() throws Exception {
+               /*
+                * Grouped Aggregate of mutable value types
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = 
ValueCollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
+                               .aggregate(Aggregations.SUM, 0)
+                               .project(1, 0);
+
+               List<Tuple2<IntValue, LongValue>> result = 
aggregateDs.collect();
+
+               String expected = "1,1\n" +
+                               "2,5\n" +
+                               "3,15\n" +
+                               "4,34\n" +
+                               "5,65\n" +
+                               "6,111\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testNestedAggregate() throws Exception {
+               /*
+                * Nested Aggregate
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+                               .aggregate(Aggregations.MIN, 0)
+                               .aggregate(Aggregations.MIN, 0)
+                               .project(0);
+
+               List<Tuple1<Integer>> result = aggregateDs.collect();
+
+               String expected = "1\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testNestedAggregateOfMutableValueTypes() throws Exception {
+               /*
+                * Nested Aggregate of mutable value types
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = 
ValueCollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
+                               .aggregate(Aggregations.MIN, 0)
+                               .aggregate(Aggregations.MIN, 0)
+                               .project(0);
+
+               List<Tuple1<IntValue>> result = aggregateDs.collect();
+
+               String expected = "1\n";
+
+               compareResultAsTuples(result, expected);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
new file mode 100644
index 0000000..4108b24
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+
+/**
+ * Integration tests for {@link CoGroupFunction}.
+ */
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupGroupSortITCase extends JavaProgramTestBase {
+
+       @Override
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
+                               new Tuple2<Long, Long>(0L, 5L),
+                               new Tuple2<Long, Long>(0L, 4L),
+                               new Tuple2<Long, Long>(0L, 3L),
+                               new Tuple2<Long, Long>(0L, 2L),
+                               new Tuple2<Long, Long>(0L, 1L),
+                               new Tuple2<Long, Long>(1L, 10L),
+                               new Tuple2<Long, Long>(1L, 8L),
+                               new Tuple2<Long, Long>(1L, 9L),
+                               new Tuple2<Long, Long>(1L, 7L));
+
+               DataSet<TestPojo> input2 = env.fromElements(
+                               new TestPojo(0L, 10L, 3L),
+                               new TestPojo(0L, 8L, 3L),
+                               new TestPojo(0L, 10L, 1L),
+                               new TestPojo(0L, 9L, 0L),
+                               new TestPojo(0L, 8L, 2L),
+                               new TestPojo(0L, 8L, 4L),
+                               new TestPojo(1L, 10L, 3L),
+                               new TestPojo(1L, 8L, 3L),
+                               new TestPojo(1L, 10L, 1L),
+                               new TestPojo(1L, 9L, 0L),
+                               new TestPojo(1L, 8L, 2L),
+                               new TestPojo(1L, 8L, 4L));
+
+               input1.coGroup(input2)
+               .where(1).equalTo("b")
+               .sortFirstGroup(0, Order.DESCENDING)
+               .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", 
Order.DESCENDING)
+
+               .with(new ValidatingCoGroup())
+               .output(new DiscardingOutputFormat<NullValue>());
+
+               env.execute();
+       }
+
+       private static class ValidatingCoGroup implements 
CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
+
+               @Override
+               public void coGroup(Iterable<Tuple2<Long, Long>> first, 
Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
+                       // validate the tuple input, field 1, descending
+                       {
+                               long lastValue = Long.MAX_VALUE;
+
+                               for (Tuple2<Long, Long> t : first) {
+                                       long current = t.f1;
+                                       Assert.assertTrue(current <= lastValue);
+                                       lastValue = current;
+                               }
+                       }
+
+                       // validate the pojo input
+                       {
+                               TestPojo lastValue = new 
TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
+
+                               for (TestPojo current : second) {
+                                       Assert.assertTrue(current.c >= 
lastValue.c);
+                                       Assert.assertTrue(current.c != 
lastValue.c || current.a <= lastValue.a);
+
+                                       lastValue = current;
+                               }
+                       }
+
+               }
+       }
+
+       /**
+        * Test POJO.
+        */
+       public static class TestPojo implements Cloneable {
+               public long a;
+               public long b;
+               public long c;
+
+               public TestPojo() {}
+
+               public TestPojo(long a, long b, long c) {
+                       this.a = a;
+                       this.b = b;
+                       this.c = c;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
new file mode 100644
index 0000000..453f525
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -0,0 +1,989 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.operators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link CoGroupFunction} and {@link 
RichCoGroupFunction}.
+ */
+@RunWith(Parameterized.class)
+public class CoGroupITCase extends MultipleProgramsTestBase {
+
+       public CoGroupITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       /*
+        * CoGroup on tuples with key field selector
+        */
+       @Test
+       public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple2<Integer, Integer>> coGroupDs = 
ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
+
+               List<Tuple2<Integer, Integer>> result = coGroupDs.collect();
+
+               String expected = "1,0\n" +
+                               "2,6\n" +
+                               "3,24\n" +
+                               "4,60\n" +
+                               "5,120\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws 
Exception {
+               /*
+                * CoGroup on two custom type inputs with key extractors
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new 
KeySelector4()).equalTo(new
+                               KeySelector5()).with(new CustomTypeCoGroup());
+
+               List<CustomType> result = coGroupDs.collect();
+
+               String expected = "1,0,test\n" +
+                               "2,6,test\n" +
+                               "3,24,test\n" +
+                               "4,60,test\n" +
+                               "5,120,test\n" +
+                               "6,210,test\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class KeySelector4 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
+
+       private static class KeySelector5 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
+
+       @Test
+       public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() 
throws Exception {
+               /*
+                * check correctness of cogroup if UDF returns left input 
objects
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> coGroupDs = 
ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
+
+               List<Tuple3<Integer, Long, String>> result = 
coGroupDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "4,3,Hello world, how are you?\n" +
+                               "5,3,I am fine.\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() 
throws Exception {
+               /*
+                * check correctness of cogroup if UDF returns right input 
objects
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs 
= ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
+
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
coGroupDs.collect();
+
+               String expected = "1,1,0,Hallo,1\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "3,4,3,Hallo Welt wie gehts?,2\n" +
+                               "3,5,4,ABC,2\n" +
+                               "3,6,5,BCD,3\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCoGroupWithBroadcastSet() throws Exception {
+               /*
+                * Reduce with broadcast set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = 
ds.coGroup(ds2).where(0).equalTo(0).with(new 
Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
+
+               List<Tuple3<Integer, Integer, Integer>> result = 
coGroupDs.collect();
+
+               String expected = "1,0,55\n" +
+                               "2,6,55\n" +
+                               "3,24,55\n" +
+                               "4,60,55\n" +
+                               "5,120,55\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void 
testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+                       throws Exception {
+               /*
+                * CoGroup on a tuple input with key field selector and a 
custom type input with key extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> coGroupDs = 
ds.coGroup(ds2).where(2).equalTo(new
+                               KeySelector2()).with(new MixedCoGroup());
+
+               List<Tuple3<Integer, Long, String>> result = 
coGroupDs.collect();
+
+               String expected = "0,1,test\n" +
+                               "1,2,test\n" +
+                               "2,5,test\n" +
+                               "3,15,test\n" +
+                               "4,33,test\n" +
+                               "5,63,test\n" +
+                               "6,109,test\n" +
+                               "7,4,test\n" +
+                               "8,4,test\n" +
+                               "9,4,test\n" +
+                               "10,5,test\n" +
+                               "11,5,test\n" +
+                               "12,5,test\n" +
+                               "13,5,test\n" +
+                               "14,5,test\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class KeySelector2 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
+
+       @Test
+       public void 
testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector()
+                       throws Exception {
+               /*
+                * CoGroup on a tuple input with key field selector and a 
custom type input with key extractor
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new 
KeySelector3()).equalTo(2).with
+                               (new MixedCoGroup2());
+
+               List<CustomType> result = coGroupDs.collect();
+
+               String expected = "0,1,test\n" +
+                               "1,2,test\n" +
+                               "2,5,test\n" +
+                               "3,15,test\n" +
+                               "4,33,test\n" +
+                               "5,63,test\n" +
+                               "6,109,test\n" +
+                               "7,4,test\n" +
+                               "8,4,test\n" +
+                               "9,4,test\n" +
+                               "10,5,test\n" +
+                               "11,5,test\n" +
+                               "12,5,test\n" +
+                               "13,5,test\n" +
+                               "14,5,test\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class KeySelector3 implements KeySelector<CustomType, 
Integer> {
+               private static final long serialVersionUID = 1L;
+               @Override
+               public Integer getKey(CustomType in) {
+                       return in.myInt;
+               }
+       }
+
+       @Test
+       public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws 
Exception {
+               /*
+                * CoGroup with multiple key fields
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> coGrouped = 
ds1.coGroup(ds2).
+                               where(0, 4).equalTo(0, 1).with(new 
Tuple5Tuple3CoGroup());
+
+               List<Tuple3<Integer, Long, String>> result = 
coGrouped.collect();
+
+               String expected = "1,1,Hallo\n" +
+                               "2,2,Hallo Welt\n" +
+                               "3,2,Hallo Welt wie gehts?\n" +
+                               "3,2,ABC\n" +
+                               "5,3,HIJ\n" +
+                               "5,3,IJK\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void 
testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
+               /*
+                * CoGroup with multiple key fields
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> coGrouped = 
ds1.coGroup(ds2).
+                               where(new KeySelector7()).
+                               equalTo(new KeySelector8()).with(new 
Tuple5Tuple3CoGroup());
+
+               List<Tuple3<Integer, Long, String>> result = 
coGrouped.collect();
+
+               String expected = "1,1,Hallo\n" +
+                               "2,2,Hallo Welt\n" +
+                               "3,2,Hallo Welt wie gehts?\n" +
+                               "3,2,ABC\n" +
+                               "5,3,HIJ\n" +
+                               "5,3,IJK\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void 
testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() 
throws Exception {
+               /*
+                * CoGroup with multiple key fields, test working closure 
cleaner for inner classes
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> coGrouped = 
ds1.coGroup(ds2).
+                               where(new KeySelector<Tuple5<Integer, Long, 
Integer, String, Long>,
+                                               Tuple2<Integer, Long>>() {
+                                       @Override
+                                       public Tuple2<Integer, Long> 
getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+                                               return new Tuple2<Integer, 
Long>(t.f0, t.f4);
+                                       }
+                               }).
+                               equalTo(new KeySelector<Tuple3<Integer, Long, 
String>, Tuple2<Integer, Long>>() {
+
+                                       @Override
+                                       public Tuple2<Integer, Long> 
getKey(Tuple3<Integer, Long, String> t) {
+                                               return new Tuple2<>(t.f0, t.f1);
+                                       }
+                               }).
+                               with(new CoGroupFunction<Tuple5<Integer, Long, 
Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>>() {
+                                       @Override
+                                       public void 
coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+                                                       
Iterable<Tuple3<Integer, Long, String>> second,
+                                                       
Collector<Tuple3<Integer, Long, String>> out) {
+                                               List<String> strs = new 
ArrayList<>();
+
+                                               for (Tuple5<Integer, Long, 
Integer, String, Long> t : first) {
+                                                       strs.add(t.f3);
+                                               }
+
+                                               for (Tuple3<Integer, Long, 
String> t : second) {
+                                                       for (String s : strs) {
+                                                               out.collect(new 
Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+                                                       }
+                                               }
+                                       }
+                               });
+
+               List<Tuple3<Integer, Long, String>> result = 
coGrouped.collect();
+
+               String expected = "1,1,Hallo\n" +
+                               "2,2,Hallo Welt\n" +
+                               "3,2,Hallo Welt wie gehts?\n" +
+                               "3,2,ABC\n" +
+                               "5,3,HIJ\n" +
+                               "5,3,IJK\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void 
testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner()
 throws Exception {
+               /*
+                * CoGroup with multiple key fields, test that disabling 
closure cleaner leads to an exception when using inner
+                * classes.
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().disableClosureCleaner();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+               boolean correctExceptionTriggered = false;
+               try {
+                       DataSet<Tuple3<Integer, Long, String>> coGrouped = 
ds1.coGroup(ds2).
+                                       where(new KeySelector<Tuple5<Integer, 
Long, Integer, String, Long>,
+                                                       Tuple2<Integer, 
Long>>() {
+                                               @Override
+                                               public Tuple2<Integer, Long> 
getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+                                                       return new 
Tuple2<Integer, Long>(t.f0, t.f4);
+                                               }
+                                       }).
+                                       equalTo(new KeySelector<Tuple3<Integer, 
Long, String>, Tuple2<Integer, Long>>() {
+
+                                               @Override
+                                               public Tuple2<Integer, Long> 
getKey(Tuple3<Integer, Long, String> t) {
+                                                       return new 
Tuple2<Integer, Long>(t.f0, t.f1);
+                                               }
+                                       }).
+                                       with(new 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>>() {
+                                               @Override
+                                               public void 
coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+                                                                       
Iterable<Tuple3<Integer, Long, String>> second,
+                                                                       
Collector<Tuple3<Integer, Long, String>> out) {
+                                                       List<String> strs = new 
ArrayList<String>();
+
+                                                       for (Tuple5<Integer, 
Long, Integer, String, Long> t : first) {
+                                                               strs.add(t.f3);
+                                                       }
+
+                                                       for (Tuple3<Integer, 
Long, String> t : second) {
+                                                               for (String s : 
strs) {
+                                                                       
out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+                                                               }
+                                                       }
+                                               }
+                                       });
+               } catch (InvalidProgramException ex) {
+                       correctExceptionTriggered = (ex.getCause() instanceof 
java.io.NotSerializableException);
+               }
+               Assert.assertTrue(correctExceptionTriggered);
+
+       }
+
+       private static class KeySelector7 implements 
KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+       Tuple2<Integer, Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, 
Integer, String, Long> t) {
+                       return new Tuple2<Integer, Long>(t.f0, t.f4);
+               }
+       }
+
+       private static class KeySelector8 implements 
KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, 
String> t) {
+                       return new Tuple2<Integer, Long>(t.f0, t.f1);
+               }
+       }
+
+       @Test
+       public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws 
Exception {
+               /*
+                * CoGroup on two custom type inputs using expression keys
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getCustomTypeDataSet(env);
+               DataSet<CustomType> coGroupDs = 
ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
+
+               List<CustomType> result = coGroupDs.collect();
+
+               String expected = "1,0,test\n" +
+                               "2,6,test\n" +
+                               "3,24,test\n" +
+                               "4,60,test\n" +
+                               "5,120,test\n" +
+                               "6,210,test\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void 
testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws
+                       Exception {
+               /*
+                * CoGroup on two custom type inputs using expression keys
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                               
.where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1());
+
+               List<CustomType> result = coGroupDs.collect();
+
+               String expected =       "-1,20000,Flink\n" +
+                               "-1,10000,Flink\n" +
+                               "-1,30000,Flink\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class CoGroup1 implements CoGroupFunction<POJO, 
Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<POJO> first,
+                               Iterable<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> second,
+                               Collector<CustomType> out) throws Exception {
+                       for (POJO p : first) {
+                               for (Tuple7<Integer, String, Integer, Integer, 
Long, String, Long> t: second) {
+                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                       out.collect(new CustomType(-1, 
p.nestedPojo.longNumber, "Flink"));
+                               }
+                       }
+               }
+       }
+
+       @Test
+       public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws 
Exception {
+               /*
+                * CoGroup field-selector (expression keys) + key selector 
function
+                * The key selector is unnecessary complicated (Tuple1) ;)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                               .where(new KeySelector6()).equalTo(6).with(new 
CoGroup3());
+
+               List<CustomType> result = coGroupDs.collect();
+
+               String expected =       "-1,20000,Flink\n" +
+                               "-1,10000,Flink\n" +
+                               "-1,30000,Flink\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       private static class KeySelector6 implements KeySelector<POJO, 
Tuple1<Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple1<Long> getKey(POJO value)
+               throws Exception {
+                       return new Tuple1<Long>(value.nestedPojo.longNumber);
+               }
+       }
+
+       private static class CoGroup3 implements CoGroupFunction<POJO, 
Tuple7<Integer,
+                       String, Integer, Integer, Long, String, Long>, 
CustomType> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<POJO> first,
+                               Iterable<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> second,
+                               Collector<CustomType> out) throws Exception {
+                       for (POJO p : first) {
+                               for (Tuple7<Integer, String, Integer, Integer, 
Long, String, Long> t: second) {
+                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                       out.collect(new CustomType(-1, 
p.nestedPojo.longNumber, "Flink"));
+                               }
+                       }
+               }
+       }
+
+       @Test
+       public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
+               /*
+                * CoGroup field-selector (expression keys) + key selector 
function
+                * The key selector is simple here
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+               DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, 
Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+               DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+                               .where(new KeySelector1()).equalTo(6).with(new 
CoGroup2());
+
+               List<CustomType> result = coGroupDs.collect();
+
+               String expected = "-1,20000,Flink\n" +
+                               "-1,10000,Flink\n" +
+                               "-1,30000,Flink\n";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testCoGroupWithAtomicType1() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Integer> ds2 = env.fromElements(0, 1, 2);
+
+               DataSet<Tuple3<Integer, Long, String>> coGroupDs = 
ds1.coGroup(ds2).where(0).equalTo("*").with(new CoGroupAtomic1());
+
+               List<Tuple3<Integer, Long, String>> result = 
coGroupDs.collect();
+
+               String expected = "(1,1,Hi)\n" +
+                       "(2,2,Hello)";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testCoGroupWithAtomicType2() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Integer> ds1 = env.fromElements(0, 1, 2);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+
+               DataSet<Tuple3<Integer, Long, String>> coGroupDs = 
ds1.coGroup(ds2).where("*").equalTo(0).with(new CoGroupAtomic2());
+
+               List<Tuple3<Integer, Long, String>> result = 
coGroupDs.collect();
+
+               String expected = "(1,1,Hi)\n" +
+                       "(2,2,Hello)";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testCoGroupWithRangePartitioning() throws Exception {
+               /*
+                * Test coGroup on tuples with multiple key field positions and 
same customized distribution
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = 
CollectionDataSets.get5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+
+               env.setParallelism(4);
+               TestDistribution testDis = new TestDistribution();
+               DataSet<Tuple3<Integer, Long, String>> coGrouped =
+                               DataSetUtils.partitionByRange(ds1, testDis, 0, 
4)
+                                               
.coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1))
+                                               .where(0, 4)
+                                               .equalTo(0, 1)
+                                               .with(new 
Tuple5Tuple3CoGroup());
+
+               List<Tuple3<Integer, Long, String>> result = 
coGrouped.collect();
+
+               String expected = "1,1,Hallo\n" +
+                               "2,2,Hallo Welt\n" +
+                               "3,2,Hallo Welt wie gehts?\n" +
+                               "3,2,ABC\n" +
+                               "5,3,HIJ\n" +
+                               "5,3,IJK\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  UDF classes
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class KeySelector1 implements KeySelector<POJO, Long> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Long getKey(POJO value)
+               throws Exception {
+                       return value.nestedPojo.longNumber;
+               }
+       }
+
+       private static class CoGroup2 implements CoGroupFunction<POJO, 
Tuple7<Integer, String,
+                       Integer, Integer, Long, String, Long>, CustomType> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<POJO> first,
+                               Iterable<Tuple7<Integer, String, Integer, 
Integer, Long, String, Long>> second,
+                               Collector<CustomType> out) throws Exception {
+                       for (POJO p : first) {
+                               for (Tuple7<Integer, String, Integer, Integer, 
Long, String, Long> t: second) {
+                                       
Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+                                       out.collect(new CustomType(-1, 
p.nestedPojo.longNumber, "Flink"));
+                               }
+                       }
+               }
+       }
+
+       private static class Tuple5CoGroup implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> first,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> second,
+                               Collector<Tuple2<Integer, Integer>> out) {
+                       int sum = 0;
+                       int id = 0;
+
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : first) {
+                               sum += element.f2;
+                               id = element.f0;
+                       }
+
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : second) {
+                               sum += element.f2;
+                               id = element.f0;
+                       }
+
+                       out.collect(new Tuple2<Integer, Integer>(id, sum));
+               }
+       }
+
+       private static class CustomTypeCoGroup implements 
CoGroupFunction<CustomType, CustomType, CustomType> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(Iterable<CustomType> first, 
Iterable<CustomType> second, Collector<CustomType> out) {
+
+                       CustomType o = new CustomType(0, 0, "test");
+
+                       for (CustomType element : first) {
+                               o.myInt = element.myInt;
+                               o.myLong += element.myLong;
+                       }
+
+                       for (CustomType element : second) {
+                               o.myInt = element.myInt;
+                               o.myLong += element.myLong;
+                       }
+
+                       out.collect(o);
+               }
+       }
+
+       private static class MixedCoGroup implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, 
Tuple3<Integer, Long, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> first,
+                               Iterable<CustomType> second,
+                               Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
+
+                       long sum = 0;
+                       int id = 0;
+
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : first) {
+                               sum += element.f0;
+                               id = element.f2;
+                       }
+
+                       for (CustomType element : second) {
+                               id = element.myInt;
+                               sum += element.myLong;
+                       }
+
+                       out.collect(new Tuple3<Integer, Long, String>(id, sum, 
"test"));
+               }
+
+       }
+
+       private static class MixedCoGroup2 implements 
CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, 
CustomType> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(Iterable<CustomType> first,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> second,
+                               Collector<CustomType> out) {
+                       CustomType o = new CustomType(0, 0, "test");
+
+                       for (CustomType element : first) {
+                               o.myInt = element.myInt;
+                               o.myLong += element.myLong;
+                       }
+
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : second) {
+                               o.myInt = element.f2;
+                               o.myLong += element.f0;
+                       }
+
+                       out.collect(o);
+
+               }
+
+       }
+
+       private static class Tuple3ReturnLeft implements 
CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, 
Tuple3<Integer, Long, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(Iterable<Tuple3<Integer, Long, String>> 
first,
+                               Iterable<Tuple3<Integer, Long, String>> second,
+                               Collector<Tuple3<Integer, Long, String>> out) {
+                       for (Tuple3<Integer, Long, String> element : first) {
+                               if (element.f0 < 6) {
+                                       out.collect(element);
+                               }
+                       }
+               }
+       }
+
+       private static class Tuple5ReturnRight implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> first,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> second,
+                               Collector<Tuple5<Integer, Long, Integer, 
String, Long>> out) {
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : second) {
+                               if (element.f0 < 4) {
+                                       out.collect(element);
+                               }
+                       }
+               }
+       }
+
+       private static class Tuple5CoGroupBC extends 
RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, 
Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, 
Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               private int broadcast = 42;
+
+               @Override
+               public void open(Configuration config) {
+
+                       Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
+                       int sum = 0;
+                       for (Integer i : ints) {
+                               sum += i;
+                       }
+                       broadcast = sum;
+
+               }
+
+               @Override
+               public void coGroup(
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> first,
+                               Iterable<Tuple5<Integer, Long, Integer, String, 
Long>> second,
+                               Collector<Tuple3<Integer, Integer, Integer>> 
out) {
+                       int sum = 0;
+                       int id = 0;
+
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : first) {
+                               sum += element.f2;
+                               id = element.f0;
+                       }
+
+                       for (Tuple5<Integer, Long, Integer, String, Long> 
element : second) {
+                               sum += element.f2;
+                               id = element.f0;
+                       }
+
+                       out.collect(new Tuple3<Integer, Integer, Integer>(id, 
sum, broadcast));
+               }
+       }
+
+       private static class Tuple5Tuple3CoGroup implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(Iterable<Tuple5<Integer, Long, Integer, 
String, Long>> first,
+                               Iterable<Tuple3<Integer, Long, String>> second,
+                               Collector<Tuple3<Integer, Long, String>> out) {
+                       List<String> strs = new ArrayList<String>();
+
+                       for (Tuple5<Integer, Long, Integer, String, Long> t : 
first) {
+                               strs.add(t.f3);
+                       }
+
+                       for (Tuple3<Integer, Long, String> t : second) {
+                               for (String s : strs) {
+                                       out.collect(new Tuple3<Integer, Long, 
String>(t.f0, t.f1, s));
+                               }
+                       }
+               }
+       }
+
+       private static class CoGroupAtomic1 implements 
CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, 
String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(Iterable<Tuple3<Integer, Long, String>> 
first, Iterable<Integer> second, Collector<Tuple3<Integer, Long, String>> out) 
throws Exception {
+                       List<Integer> ints = new ArrayList<Integer>();
+
+                       for (Integer i : second) {
+                               ints.add(i);
+                       }
+
+                       for (Tuple3<Integer, Long, String> t : first) {
+                               for (Integer i : ints) {
+                                       if (t.f0.equals(i)) {
+                                               out.collect(t);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private static class CoGroupAtomic2 implements CoGroupFunction<Integer, 
Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void coGroup(Iterable<Integer> first, 
Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, 
String>> out) throws Exception {
+                       List<Integer> ints = new ArrayList<Integer>();
+
+                       for (Integer i : first) {
+                               ints.add(i);
+                       }
+
+                       for (Tuple3<Integer, Long, String> t : second) {
+                               for (Integer i : ints) {
+                                       if (t.f0.equals(i)) {
+                                               out.collect(t);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Test {@link DataDistribution}.
+        */
+       public static class TestDistribution implements DataDistribution {
+               public Object[][] boundaries = new Object[][]{
+                               new Object[]{2, 2L},
+                               new Object[]{5, 4L},
+                               new Object[]{10, 12L},
+                               new Object[]{21, 6L}
+               };
+
+               public TestDistribution() {}
+
+               @Override
+               public Object[] getBucketBoundary(int bucketNum, int 
totalNumBuckets) {
+                       return boundaries[bucketNum];
+               }
+
+               @Override
+               public int getNumberOfFields() {
+                       return 2;
+               }
+
+               @Override
+               public TypeInformation[] getKeyTypes() {
+                       return new 
TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+
+               }
+
+               @Override
+               public void read(DataInputView in) throws IOException {
+
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return obj instanceof TestDistribution;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
new file mode 100644
index 0000000..6e61f60
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.RichCrossFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link CrossFunction} and {@link RichCrossFunction}.
+ */
+@RunWith(Parameterized.class)
+public class CrossITCase extends MultipleProgramsTestBase {
+
+       public CrossITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
+               /*
+                * check correctness of cross on two tuple inputs
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<Integer, String>> crossDs = 
ds.cross(ds2).with(new Tuple5Cross());
+
+               List<Tuple2<Integer, String>> result = crossDs.collect();
+
+               String expected = "0,HalloHallo\n" +
+                               "1,HalloHallo Welt\n" +
+                               "2,HalloHallo Welt wie\n" +
+                               "1,Hallo WeltHallo\n" +
+                               "2,Hallo WeltHallo Welt\n" +
+                               "3,Hallo WeltHallo Welt wie\n" +
+                               "2,Hallo Welt wieHallo\n" +
+                               "3,Hallo Welt wieHallo Welt\n" +
+                               "4,Hallo Welt wieHallo Welt wie\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws 
Exception {
+               /*
+                * check correctness of cross if UDF returns left input object
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> crossDs = 
ds.cross(ds2).with(new Tuple3ReturnLeft());
+
+               List<Tuple3<Integer, Long, String>> result = crossDs.collect();
+
+               String expected = "1,1,Hi\n" +
+                               "1,1,Hi\n" +
+                               "1,1,Hi\n" +
+                               "2,2,Hello\n" +
+                               "2,2,Hello\n" +
+                               "2,2,Hello\n" +
+                               "3,2,Hello world\n" +
+                               "3,2,Hello world\n" +
+                               "3,2,Hello world\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws 
Exception {
+               /*
+                * check correctness of cross if UDF returns right input object
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = 
ds.cross(ds2).with(new Tuple5ReturnRight());
+
+               List<Tuple5<Integer, Long, Integer, String, Long>> result = 
crossDs
+                               .collect();
+
+               String expected = "1,1,0,Hallo,1\n" +
+                               "1,1,0,Hallo,1\n" +
+                               "1,1,0,Hallo,1\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "2,2,1,Hallo Welt,2\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "2,3,2,Hallo Welt wie,1\n" +
+                               "2,3,2,Hallo Welt wie,1\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfCrossWithBroadcastSet() throws Exception {
+               /*
+                * check correctness of cross with broadcast set
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple3<Integer, Integer, Integer>> crossDs = 
ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
+
+               List<Tuple3<Integer, Integer, Integer>> result = 
crossDs.collect();
+
+               String expected = "2,0,55\n" +
+                               "3,0,55\n" +
+                               "3,0,55\n" +
+                               "3,0,55\n" +
+                               "4,1,55\n" +
+                               "4,2,55\n" +
+                               "3,0,55\n" +
+                               "4,2,55\n" +
+                               "4,4,55\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfCrossWithHuge() throws Exception {
+               /*
+                * check correctness of crossWithHuge (only correctness of 
result -> should be the same as with normal cross)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<Integer, String>> crossDs = 
ds.crossWithHuge(ds2).with(new Tuple5Cross());
+
+               List<Tuple2<Integer, String>> result = crossDs.collect();
+
+               String expected = "0,HalloHallo\n" +
+                               "1,HalloHallo Welt\n" +
+                               "2,HalloHallo Welt wie\n" +
+                               "1,Hallo WeltHallo\n" +
+                               "2,Hallo WeltHallo Welt\n" +
+                               "3,Hallo WeltHallo Welt wie\n" +
+                               "2,Hallo Welt wieHallo\n" +
+                               "3,Hallo Welt wieHallo Welt\n" +
+                               "4,Hallo Welt wieHallo Welt wie\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfCrossWithTiny() throws Exception {
+               /*
+                * check correctness of crossWithTiny (only correctness of 
result -> should be the same as with normal cross)
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<Integer, String>> crossDs = 
ds.crossWithTiny(ds2).with(new Tuple5Cross());
+
+               List<Tuple2<Integer, String>> result = crossDs.collect();
+
+               String expected = "0,HalloHallo\n" +
+                               "1,HalloHallo Welt\n" +
+                               "2,HalloHallo Welt wie\n" +
+                               "1,Hallo WeltHallo\n" +
+                               "2,Hallo WeltHallo Welt\n" +
+                               "3,Hallo WeltHallo Welt wie\n" +
+                               "2,Hallo Welt wieHallo\n" +
+                               "3,Hallo Welt wieHallo Welt\n" +
+                               "4,Hallo Welt wieHallo Welt wie\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testProjectCrossOnATupleInput1() throws Exception{
+               /*
+                * project cross on a tuple input 1
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple6<String, Long, String, Integer, Long, Long>> 
crossDs = ds.cross(ds2)
+                               .projectFirst(2, 1)
+                               .projectSecond(3)
+                               .projectFirst(0)
+                               .projectSecond(4, 1);
+
+               List<Tuple6<String, Long, String, Integer, Long, Long>> result 
= crossDs.collect();
+
+               String expected = "Hi,1,Hallo,1,1,1\n" +
+                               "Hi,1,Hallo Welt,1,2,2\n" +
+                               "Hi,1,Hallo Welt wie,1,1,3\n" +
+                               "Hello,2,Hallo,2,1,1\n" +
+                               "Hello,2,Hallo Welt,2,2,2\n" +
+                               "Hello,2,Hallo Welt wie,2,1,3\n" +
+                               "Hello world,2,Hallo,3,1,1\n" +
+                               "Hello world,2,Hallo Welt,3,2,2\n" +
+                               "Hello world,2,Hallo Welt wie,3,1,3\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testProjectCrossOnATupleInput2() throws Exception {
+               /*
+                * project cross on a tuple input 2
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple6<String, String, Long, Long, Long, Integer>> 
crossDs = ds.cross(ds2)
+                               .projectSecond(3)
+                               .projectFirst(2, 1)
+                               .projectSecond(4, 1)
+                               .projectFirst(0);
+
+               List<Tuple6<String, String, Long, Long, Long, Integer>> result 
= crossDs.collect();
+
+               String expected = "Hallo,Hi,1,1,1,1\n" +
+                               "Hallo Welt,Hi,1,2,2,1\n" +
+                               "Hallo Welt wie,Hi,1,1,3,1\n" +
+                               "Hallo,Hello,2,1,1,2\n" +
+                               "Hallo Welt,Hello,2,2,2,2\n" +
+                               "Hallo Welt wie,Hello,2,1,3,2\n" +
+                               "Hallo,Hello world,2,1,1,3\n" +
+                               "Hallo Welt,Hello world,2,2,2,3\n" +
+                               "Hallo Welt wie,Hello world,2,1,3,3\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfDefaultCross() throws Exception {
+               /*
+                * check correctness of default cross
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, 
Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
+
+               List<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, 
Long, Integer, String, Long>>> result = crossDs.collect();
+
+               String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n"
+                               +
+                               "(1,1,Hi),(1,1,0,Hallo,1)\n" +
+                               "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
+                               "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
+                               "(2,2,Hello),(1,1,0,Hallo,1)\n" +
+                               "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
+                               "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
+                               "(3,2,Hello world),(1,1,0,Hallo,1)\n" +
+                               "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws 
Exception {
+               /*
+                * check correctness of cross on two custom type inputs
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<CustomType> ds = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+               DataSet<CustomType> crossDs = ds.cross(ds2).with(new 
CustomTypeCross());
+
+               List<CustomType> result = crossDs.collect();
+
+               String expected = "1,0,HiHi\n"
+                               + "2,1,HiHello\n"
+                               + "2,2,HiHello world\n"
+                               + "2,1,HelloHi\n"
+                               + "4,2,HelloHello\n"
+                               + "4,3,HelloHello world\n"
+                               + "2,2,Hello worldHi\n"
+                               + "4,3,Hello worldHello\n"
+                               + "4,4,Hello worldHello world";
+
+               compareResultAsText(result, expected);
+       }
+
+       @Test
+       public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() 
throws Exception {
+               /*
+                * check correctness of cross a tuple input and a custom type 
input
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = 
CollectionDataSets.getSmall5TupleDataSet(env);
+               DataSet<CustomType> ds2 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+               DataSet<Tuple3<Integer, Long, String>> crossDs = 
ds.cross(ds2).with(new MixedCross());
+
+               List<Tuple3<Integer, Long, String>> result = crossDs.collect();
+
+               String expected = "2,0,HalloHi\n" +
+                               "3,0,HalloHello\n" +
+                               "3,0,HalloHello world\n" +
+                               "3,0,Hallo WeltHi\n" +
+                               "4,1,Hallo WeltHello\n" +
+                               "4,2,Hallo WeltHello world\n" +
+                               "3,0,Hallo Welt wieHi\n" +
+                               "4,2,Hallo Welt wieHello\n" +
+                               "4,4,Hallo Welt wieHello world\n";
+
+               compareResultAsTuples(result, expected);
+       }
+
+       private static class Tuple5Cross implements 
CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, String> cross(
+                               Tuple5<Integer, Long, Integer, String, Long> 
first,
+                               Tuple5<Integer, Long, Integer, String, Long> 
second)
+                               throws Exception {
+
+                               return new Tuple2<Integer, String>(first.f2 + 
second.f2, first.f3 + second.f3);
+               }
+
+       }
+
+       private static class CustomTypeCross implements 
CrossFunction<CustomType, CustomType, CustomType> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public CustomType cross(CustomType first, CustomType second)
+                               throws Exception {
+
+                       return new CustomType(first.myInt * second.myInt, 
first.myLong + second.myLong, first.myString + second.myString);
+               }
+
+       }
+
+       private static class MixedCross implements 
CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, 
Tuple3<Integer, Long, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple3<Integer, Long, String> cross(
+                               Tuple5<Integer, Long, Integer, String, Long> 
first,
+                               CustomType second) throws Exception {
+
+                       return new Tuple3<Integer, Long, String>(first.f0 + 
second.myInt, first.f2 * second.myLong, first.f3 + second.myString);
+               }
+
+       }
+
+       private static class Tuple3ReturnLeft implements 
CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple3<Integer, Long, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple3<Integer, Long, String> cross(
+                               Tuple3<Integer, Long, String> first,
+                               Tuple5<Integer, Long, Integer, String, Long> 
second) throws Exception {
+
+                       return first;
+               }
+       }
+
+       private static class Tuple5ReturnRight implements 
CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple5<Integer, Long, Integer, String, Long> cross(
+                               Tuple3<Integer, Long, String> first,
+                               Tuple5<Integer, Long, Integer, String, Long> 
second)
+                               throws Exception {
+
+                       return second;
+               }
+
+       }
+
+       private static class Tuple5CrossBC extends 
RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               private int broadcast = 42;
+
+               @Override
+               public void open(Configuration config) {
+
+                       Collection<Integer> ints = 
this.getRuntimeContext().getBroadcastVariable("ints");
+                       int sum = 0;
+                       for (Integer i : ints) {
+                               sum += i;
+                       }
+                       broadcast = sum;
+
+               }
+
+               @Override
+               public Tuple3<Integer, Integer, Integer> cross(
+                               Tuple5<Integer, Long, Integer, String, Long> 
first,
+                               Tuple5<Integer, Long, Integer, String, Long> 
second)
+                               throws Exception {
+
+                       return new Tuple3<Integer, Integer, Integer>(first.f0 + 
second.f0, first.f2 * second.f2, broadcast);
+               }
+       }
+}

Reply via email to