http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java index 6215f31..638eb5c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.types.IntValue; + import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +42,13 @@ import java.util.Random; import static org.hamcrest.Matchers.is; -/* +/** * These programs demonstrate the effects of user defined functions which modify input objects or return locally created * objects that are retained and reused on future calls. The programs do not retain and later modify input objects. */ public class OverwriteObjects { - public final static Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class); + public static final Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class); // DataSets are created with this number of elements private static final int NUMBER_OF_ELEMENTS = 3_000_000; @@ -71,7 +72,7 @@ public class OverwriteObjects { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); - for (int parallelism = MAX_PARALLELISM ; parallelism > 0 ; parallelism--) { + for (int parallelism = MAX_PARALLELISM; parallelism > 0; parallelism--) { LOG.info("Parallelism = {}", parallelism); env.setParallelism(parallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java b/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java index c8604cb..46be968 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java @@ -36,7 +36,7 @@ import java.util.Random; * (See also http://peel-framework.org/2016/04/07/hash-aggregations-in-flink.html) */ public class ReducePerformance { - + public static void main(String[] args) throws Exception { final int numElements = 40_000_000; @@ -120,7 +120,7 @@ public class ReducePerformance { int rem = numElements % numPartitions; SplittableRandomIterator<T, B>[] res = new SplittableRandomIterator[numPartitions]; for (int i = 0; i < numPartitions; i++) { - res[i] = new SplittableRandomIterator<T, B>(i < rem ? splitSize : splitSize + 1, (B)baseIterator.copy()); + res[i] = new SplittableRandomIterator<T, B>(i < rem ? splitSize : splitSize + 1, (B) baseIterator.copy()); } return res; } @@ -140,7 +140,6 @@ public class ReducePerformance { CopyableIterator<T> copy(); } - private static final class TupleIntIntIterator implements CopyableIterator<Tuple2<Integer, Integer>>, Serializable { private final int keyRange; @@ -183,7 +182,6 @@ public class ReducePerformance { } } - private static final class TupleStringIntIterator implements CopyableIterator<Tuple2<String, Integer>>, Serializable { private final int keyRange; @@ -226,7 +224,6 @@ public class ReducePerformance { } } - private static final class SumReducer<K> implements ReduceFunction<Tuple2<K, Integer>> { @Override public Tuple2<K, Integer> reduce(Tuple2<K, Integer> a, Tuple2<K, Integer> b) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index 90dbe80..c7f43fa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -31,24 +31,27 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import static org.junit.Assert.fail; +/** + * Manual test to evaluate impact of checkpointing on latency. + */ public class StreamingScalabilityAndLatency { - + public static void main(String[] args) throws Exception { if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) { throw new RuntimeException("This test program needs to run with at least 5GB of heap space."); } - - final int TASK_MANAGERS = 1; - final int SLOTS_PER_TASK_MANAGER = 80; - final int PARALLELISM = TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; + + final int taskManagers = 1; + final int slotsPerTaskManager = 80; + final int parallelism = taskManagers * slotsPerTaskManager; LocalFlinkMiniCluster cluster = null; try { Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, TASK_MANAGERS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagers); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000); config.setInteger("taskmanager.net.server.numThreads", 1); @@ -56,8 +59,8 @@ public class StreamingScalabilityAndLatency { cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); - - runPartitioningProgram(cluster.getLeaderRPCPort(), PARALLELISM); + + runPartitioningProgram(cluster.getLeaderRPCPort(), parallelism); } catch (Exception e) { e.printStackTrace(); @@ -69,7 +72,7 @@ public class StreamingScalabilityAndLatency { } } } - + private static void runPartitioningProgram(int jobManagerPort, int parallelism) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); env.setParallelism(parallelism); @@ -83,23 +86,22 @@ public class StreamingScalabilityAndLatency { .map(new IdMapper<Tuple2<Long, Long>>()) .keyBy(0) .addSink(new TimestampingSink()); - + env.execute("Partitioning Program"); } - - public static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> { + + private static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> { private static final long serialVersionUID = -151782334777482511L; private volatile boolean running = true; - - + @Override public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { - + long num = 100; long counter = (long) (Math.random() * 4096); - + while (running) { if (num < 100) { num++; @@ -119,14 +121,14 @@ public class StreamingScalabilityAndLatency { running = false; } } - - public static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> { + + private static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> { private static final long serialVersionUID = 1876986644706201196L; private long maxLatency; - private long count; - + private long count; + @Override public void invoke(Tuple2<Long, Long> value) { long ts = value.f1; @@ -134,7 +136,7 @@ public class StreamingScalabilityAndLatency { long diff = System.currentTimeMillis() - ts; maxLatency = Math.max(diff, maxLatency); } - + count++; if (count == 5000) { System.out.println("Max latency: " + maxLatency); @@ -144,7 +146,7 @@ public class StreamingScalabilityAndLatency { } } - public static class IdMapper<T> implements MapFunction<T, T> { + private static class IdMapper<T> implements MapFunction<T, T> { private static final long serialVersionUID = -6543809409233225099L; http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java index 1c5744d..bd5123a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java @@ -21,4 +21,5 @@ * need to be manually invoked, because they are extremely heavy, time intensive, * of require larger-than-usual JVMs. */ -package org.apache.flink.test.manual; \ No newline at end of file + +package org.apache.flink.test.manual; http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index eea2509..1fb5e65 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -18,13 +18,10 @@ package org.apache.flink.test.misc; -import static org.junit.Assert.*; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -33,6 +30,7 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -41,6 +39,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + /** * This test verifies that the auto parallelism is properly forwarded to the runtime. */ @@ -79,7 +80,6 @@ public class AutoParallelismITCase extends TestLogger { } } - @Test public void testProgramWithAutoParallelism() { try { http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java index 39a08d2..b8f1d80 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java @@ -24,8 +24,12 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.test.util.JavaProgramTestBase; + import org.junit.Assert; +/** + * Integration tests for custom {@link Partitioner}. + */ @SuppressWarnings("serial") public class CustomPartitioningITCase extends JavaProgramTestBase { @@ -36,17 +40,17 @@ public class CustomPartitioningITCase extends JavaProgramTestBase { if (!isCollectionExecution()) { Assert.assertTrue(env.getParallelism() > 1); } - + env.generateSequence(1, 1000) .partitionCustom(new AllZeroPartitioner(), new IdKeySelector<Long>()) .map(new FailExceptInPartitionZeroMapper()) .output(new DiscardingOutputFormat<Long>()); - + env.execute(); } - - public static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> { - + + private static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> { + @Override public Long map(Long value) throws Exception { if (getRuntimeContext().getIndexOfThisSubtask() == 0) { @@ -56,15 +60,15 @@ public class CustomPartitioningITCase extends JavaProgramTestBase { } } } - - public static class AllZeroPartitioner implements Partitioner<Long> { + + private static class AllZeroPartitioner implements Partitioner<Long> { @Override public int partition(Long key, int numPartitions) { return 0; } } - - public static class IdKeySelector<T> implements KeySelector<T, T> { + + private static class IdKeySelector<T> implements KeySelector<T, T> { @Override public T getKey(T value) { return value; http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java index 76480ba..1532741 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java @@ -30,8 +30,8 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.types.Value; - import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -41,11 +41,15 @@ import java.io.IOException; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Test for proper error messages in case user-defined serialization is broken + * and detected in the network stack. + */ @SuppressWarnings("serial") public class CustomSerializationITCase extends TestLogger { private static final int PARLLELISM = 5; - + private static LocalFlinkMiniCluster cluster; private static TestEnvironment env; @@ -66,13 +70,13 @@ public class CustomSerializationITCase extends TestLogger { cluster.shutdown(); cluster = null; } - + @Test public void testIncorrectSerializer1() { try { env.setParallelism(PARLLELISM); env.getConfig().disableSysoutLogging(); - + env .generateSequence(1, 10 * PARLLELISM) .map(new MapFunction<Long, ConsumesTooMuch>() { @@ -83,7 +87,7 @@ public class CustomSerializationITCase extends TestLogger { }) .rebalance() .output(new DiscardingOutputFormat<ConsumesTooMuch>()); - + env.execute(); } catch (JobExecutionException e) { @@ -186,11 +190,14 @@ public class CustomSerializationITCase extends TestLogger { fail(e.getMessage()); } } - + // ------------------------------------------------------------------------ // Custom Data Types with broken Serialization Logic // ------------------------------------------------------------------------ - + + /** + * {@link Value} reading more data than written. + */ public static class ConsumesTooMuch implements Value { @Override @@ -206,6 +213,9 @@ public class CustomSerializationITCase extends TestLogger { } } + /** + * {@link Value} reading more buffers than written. + */ public static class ConsumesTooMuchSpanning implements Value { @Override @@ -221,6 +231,9 @@ public class CustomSerializationITCase extends TestLogger { } } + /** + * {@link Value} reading less data than written. + */ public static class ConsumesTooLittle implements Value { @Override @@ -236,6 +249,9 @@ public class CustomSerializationITCase extends TestLogger { } } + /** + * {@link Value} reading fewer buffers than written. + */ public static class ConsumesTooLittleSpanning implements Value { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java index fa1fcb6..c004759 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java @@ -21,20 +21,23 @@ package org.apache.flink.test.misc; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.CollectionDataSets; import org.junit.Assert; import org.junit.Test; +/** + * Test TypeInfo serializer tree. + */ public class GenericTypeInfoTest { @Test public void testSerializerTree() { @SuppressWarnings("unchecked") - TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti = - (TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) + TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti = + (TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class); - + String serTree = Utils.getSerializerTree(ti); // We can not test against the entire output because the fields of 'String' differ // between java versions @@ -67,7 +70,7 @@ public class GenericTypeInfoTest { " lowestSetBit:int\n" + " firstNonzeroIntNum:int\n" + " mixed:java.util.List\n" + - " makeMeGeneric:org.apache.flink.test.javaApiOperators.util.CollectionDataSets$PojoWithDateAndEnum\n" + + " makeMeGeneric:org.apache.flink.test.operators.util.CollectionDataSets$PojoWithDateAndEnum\n" + " group:java.lang.String\n")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java index 7dab0f1..00b4485 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java @@ -30,24 +30,26 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; - import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.Collector; - import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the system behavior in multiple corner cases * - when null records are passed through the system. * - when disjoint dataflows are executed * - when accumulators are used chained after a non-udf operator. - * - * The tests are bundled into one class to reuse the same test cluster. This speeds + * + * <p>The tests are bundled into one class to reuse the same test cluster. This speeds * up test execution, as the majority of the test time goes usually into starting/stopping the * test cluster. */ @@ -59,7 +61,7 @@ public class MiscellaneousIssuesITCase extends TestLogger { private static LocalFlinkMiniCluster cluster; private static TestEnvironment env; - + @BeforeClass public static void startCluster() { Configuration config = new Configuration(); @@ -72,13 +74,13 @@ public class MiscellaneousIssuesITCase extends TestLogger { env = new TestEnvironment(cluster, PARALLELISM, false); } - + @AfterClass public static void shutdownCluster() { cluster.shutdown(); cluster = null; } - + @Test public void testNullValues() { try { @@ -128,13 +130,13 @@ public class MiscellaneousIssuesITCase extends TestLogger { @Test public void testAccumulatorsAfterNoOp() { - - final String ACC_NAME = "test_accumulator"; - + + final String accName = "test_accumulator"; + try { env.setParallelism(6); env.getConfig().disableSysoutLogging(); - + env.generateSequence(1, 1000000) .rebalance() .flatMap(new RichFlatMapFunction<Long, Long>() { @@ -143,7 +145,7 @@ public class MiscellaneousIssuesITCase extends TestLogger { @Override public void open(Configuration parameters) { - counter = getRuntimeContext().getLongCounter(ACC_NAME); + counter = getRuntimeContext().getLongCounter(accName); } @Override @@ -154,8 +156,8 @@ public class MiscellaneousIssuesITCase extends TestLogger { .output(new DiscardingOutputFormat<Long>()); JobExecutionResult result = env.execute(); - - assertEquals(1000000L, result.getAllAccumulatorResults().get(ACC_NAME)); + + assertEquals(1000000L, result.getAllAccumulatorResults().get(accName)); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index a5103cc..fd556d5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -32,35 +32,41 @@ import org.apache.flink.examples.java.clustering.KMeans; import org.apache.flink.examples.java.clustering.util.KMeansData; import org.apache.flink.examples.java.graph.ConnectedComponents; import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; - import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.test.util.TestEnvironment; import org.apache.flink.util.TestLogger; + import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +/** + * Test that runs an iterative job after a failure in another iterative job. + * This test validates that task slots in co-location constraints are properly + * freed in the presence of failures. + */ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { private static final int PARALLELISM = 16; @Test public void testSuccessfulProgramAfterFailure() { LocalFlinkMiniCluster cluster = null; - + try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); - + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); TestEnvironment env = new TestEnvironment(cluster, PARALLELISM, false); - + try { runConnectedComponents(env); } @@ -68,7 +74,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { e.printStackTrace(); fail("Program Execution should have succeeded."); } - + try { runKMeans(env); fail("This program execution should have failed."); @@ -76,7 +82,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { catch (JobExecutionException e) { assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers")); } - + try { runConnectedComponents(env); } @@ -95,9 +101,9 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { } } } - + private static void runConnectedComponents(ExecutionEnvironment env) throws Exception { - + env.setParallelism(PARALLELISM); env.getConfig().disableSysoutLogging(); @@ -166,7 +172,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { .map(new KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); clusteredPoints.output(new DiscardingOutputFormat<Tuple2<Integer, KMeans.Point>>()); - + env.execute("KMeans Example"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java new file mode 100644 index 0000000..b4bd213 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.ValueCollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +/** + * Integration tests for aggregations. + */ +@RunWith(Parameterized.class) +public class AggregateITCase extends MultipleProgramsTestBase { + + public AggregateITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testFullAggregate() throws Exception { + /* + * Full Aggregate + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Integer, Long>> aggregateDs = ds + .aggregate(Aggregations.SUM, 0) + .and(Aggregations.MAX, 1) + .project(0, 1); + + List<Tuple2<Integer, Long>> result = aggregateDs.collect(); + + String expected = "231,6\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testFullAggregateOfMutableValueTypes() throws Exception { + /* + * Full Aggregate of mutable value types + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds + .aggregate(Aggregations.SUM, 0) + .and(Aggregations.MAX, 1) + .project(0, 1); + + List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect(); + + String expected = "231,6\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testGroupedAggregate() throws Exception { + /* + * Grouped Aggregate + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1) + .aggregate(Aggregations.SUM, 0) + .project(1, 0); + + List<Tuple2<Long, Integer>> result = aggregateDs.collect(); + + String expected = "1,1\n" + + "2,5\n" + + "3,15\n" + + "4,34\n" + + "5,65\n" + + "6,111\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testGroupedAggregateOfMutableValueTypes() throws Exception { + /* + * Grouped Aggregate of mutable value types + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1) + .aggregate(Aggregations.SUM, 0) + .project(1, 0); + + List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect(); + + String expected = "1,1\n" + + "2,5\n" + + "3,15\n" + + "4,34\n" + + "5,65\n" + + "6,111\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testNestedAggregate() throws Exception { + /* + * Nested Aggregate + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1) + .aggregate(Aggregations.MIN, 0) + .aggregate(Aggregations.MIN, 0) + .project(0); + + List<Tuple1<Integer>> result = aggregateDs.collect(); + + String expected = "1\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testNestedAggregateOfMutableValueTypes() throws Exception { + /* + * Nested Aggregate of mutable value types + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1) + .aggregate(Aggregations.MIN, 0) + .aggregate(Aggregations.MIN, 0) + .project(0); + + List<Tuple1<IntValue>> result = aggregateDs.collect(); + + String expected = "1\n"; + + compareResultAsTuples(result, expected); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java new file mode 100644 index 0000000..4108b24 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +import org.junit.Assert; + +/** + * Integration tests for {@link CoGroupFunction}. + */ +@SuppressWarnings({"serial", "unchecked"}) +public class CoGroupGroupSortITCase extends JavaProgramTestBase { + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> input1 = env.fromElements( + new Tuple2<Long, Long>(0L, 5L), + new Tuple2<Long, Long>(0L, 4L), + new Tuple2<Long, Long>(0L, 3L), + new Tuple2<Long, Long>(0L, 2L), + new Tuple2<Long, Long>(0L, 1L), + new Tuple2<Long, Long>(1L, 10L), + new Tuple2<Long, Long>(1L, 8L), + new Tuple2<Long, Long>(1L, 9L), + new Tuple2<Long, Long>(1L, 7L)); + + DataSet<TestPojo> input2 = env.fromElements( + new TestPojo(0L, 10L, 3L), + new TestPojo(0L, 8L, 3L), + new TestPojo(0L, 10L, 1L), + new TestPojo(0L, 9L, 0L), + new TestPojo(0L, 8L, 2L), + new TestPojo(0L, 8L, 4L), + new TestPojo(1L, 10L, 3L), + new TestPojo(1L, 8L, 3L), + new TestPojo(1L, 10L, 1L), + new TestPojo(1L, 9L, 0L), + new TestPojo(1L, 8L, 2L), + new TestPojo(1L, 8L, 4L)); + + input1.coGroup(input2) + .where(1).equalTo("b") + .sortFirstGroup(0, Order.DESCENDING) + .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING) + + .with(new ValidatingCoGroup()) + .output(new DiscardingOutputFormat<NullValue>()); + + env.execute(); + } + + private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> { + + @Override + public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception { + // validate the tuple input, field 1, descending + { + long lastValue = Long.MAX_VALUE; + + for (Tuple2<Long, Long> t : first) { + long current = t.f1; + Assert.assertTrue(current <= lastValue); + lastValue = current; + } + } + + // validate the pojo input + { + TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE); + + for (TestPojo current : second) { + Assert.assertTrue(current.c >= lastValue.c); + Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a); + + lastValue = current; + } + } + + } + } + + /** + * Test POJO. + */ + public static class TestPojo implements Cloneable { + public long a; + public long b; + public long c; + + public TestPojo() {} + + public TestPojo(long a, long b, long c) { + this.a = a; + this.b = b; + this.c = c; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java new file mode 100644 index 0000000..453f525 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java @@ -0,0 +1,989 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.RichCoGroupFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.operators.util.CollectionDataSets.POJO; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Integration tests for {@link CoGroupFunction} and {@link RichCoGroupFunction}. + */ +@RunWith(Parameterized.class) +public class CoGroupITCase extends MultipleProgramsTestBase { + + public CoGroupITCase(TestExecutionMode mode){ + super(mode); + } + + /* + * CoGroup on tuples with key field selector + */ + @Test + public void testCoGroupTuplesWithKeyFieldSelector() throws Exception { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup()); + + List<Tuple2<Integer, Integer>> result = coGroupDs.collect(); + + String expected = "1,0\n" + + "2,6\n" + + "3,24\n" + + "4,60\n" + + "5,120\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception { + /* + * CoGroup on two custom type inputs with key extractors + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new + KeySelector5()).with(new CustomTypeCoGroup()); + + List<CustomType> result = coGroupDs.collect(); + + String expected = "1,0,test\n" + + "2,6,test\n" + + "3,24,test\n" + + "4,60,test\n" + + "5,120,test\n" + + "6,210,test\n"; + + compareResultAsText(result, expected); + } + + private static class KeySelector4 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } + + private static class KeySelector5 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } + + @Test + public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception { + /* + * check correctness of cogroup if UDF returns left input objects + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft()); + + List<Tuple3<Integer, Long, String>> result = coGroupDs.collect(); + + String expected = "1,1,Hi\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + + "5,3,I am fine.\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception { + /* + * check correctness of cogroup if UDF returns right input objects + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight()); + + List<Tuple5<Integer, Long, Integer, String, Long>> result = coGroupDs.collect(); + + String expected = "1,1,0,Hallo,1\n" + + "2,2,1,Hallo Welt,2\n" + + "2,3,2,Hallo Welt wie,1\n" + + "3,4,3,Hallo Welt wie gehts?,2\n" + + "3,5,4,ABC,2\n" + + "3,6,5,BCD,3\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCoGroupWithBroadcastSet() throws Exception { + /* + * Reduce with broadcast set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints"); + + List<Tuple3<Integer, Integer, Integer>> result = coGroupDs.collect(); + + String expected = "1,0,55\n" + + "2,6,55\n" + + "3,24,55\n" + + "4,60,55\n" + + "5,120,55\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor() + throws Exception { + /* + * CoGroup on a tuple input with key field selector and a custom type input with key extractor + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new + KeySelector2()).with(new MixedCoGroup()); + + List<Tuple3<Integer, Long, String>> result = coGroupDs.collect(); + + String expected = "0,1,test\n" + + "1,2,test\n" + + "2,5,test\n" + + "3,15,test\n" + + "4,33,test\n" + + "5,63,test\n" + + "6,109,test\n" + + "7,4,test\n" + + "8,4,test\n" + + "9,4,test\n" + + "10,5,test\n" + + "11,5,test\n" + + "12,5,test\n" + + "13,5,test\n" + + "14,5,test\n"; + + compareResultAsTuples(result, expected); + } + + private static class KeySelector2 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } + + @Test + public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector() + throws Exception { + /* + * CoGroup on a tuple input with key field selector and a custom type input with key extractor + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector3()).equalTo(2).with + (new MixedCoGroup2()); + + List<CustomType> result = coGroupDs.collect(); + + String expected = "0,1,test\n" + + "1,2,test\n" + + "2,5,test\n" + + "3,15,test\n" + + "4,33,test\n" + + "5,63,test\n" + + "6,109,test\n" + + "7,4,test\n" + + "8,4,test\n" + + "9,4,test\n" + + "10,5,test\n" + + "11,5,test\n" + + "12,5,test\n" + + "13,5,test\n" + + "14,5,test\n"; + + compareResultAsText(result, expected); + } + + private static class KeySelector3 implements KeySelector<CustomType, Integer> { + private static final long serialVersionUID = 1L; + @Override + public Integer getKey(CustomType in) { + return in.myInt; + } + } + + @Test + public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception { + /* + * CoGroup with multiple key fields + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2). + where(0, 4).equalTo(0, 1).with(new Tuple5Tuple3CoGroup()); + + List<Tuple3<Integer, Long, String>> result = coGrouped.collect(); + + String expected = "1,1,Hallo\n" + + "2,2,Hallo Welt\n" + + "3,2,Hallo Welt wie gehts?\n" + + "3,2,ABC\n" + + "5,3,HIJ\n" + + "5,3,IJK\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception { + /* + * CoGroup with multiple key fields + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2). + where(new KeySelector7()). + equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup()); + + List<Tuple3<Integer, Long, String>> result = coGrouped.collect(); + + String expected = "1,1,Hallo\n" + + "2,2,Hallo Welt\n" + + "3,2,Hallo Welt wie gehts?\n" + + "3,2,ABC\n" + + "5,3,HIJ\n" + + "5,3,IJK\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception { + /* + * CoGroup with multiple key fields, test working closure cleaner for inner classes + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2). + where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, + Tuple2<Integer, Long>>() { + @Override + public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception { + return new Tuple2<Integer, Long>(t.f0, t.f4); + } + }). + equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() { + + @Override + public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) { + return new Tuple2<>(t.f0, t.f1); + } + }). + with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() { + @Override + public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple3<Integer, Long, String>> second, + Collector<Tuple3<Integer, Long, String>> out) { + List<String> strs = new ArrayList<>(); + + for (Tuple5<Integer, Long, Integer, String, Long> t : first) { + strs.add(t.f3); + } + + for (Tuple3<Integer, Long, String> t : second) { + for (String s : strs) { + out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s)); + } + } + } + }); + + List<Tuple3<Integer, Long, String>> result = coGrouped.collect(); + + String expected = "1,1,Hallo\n" + + "2,2,Hallo Welt\n" + + "3,2,Hallo Welt wie gehts?\n" + + "3,2,ABC\n" + + "5,3,HIJ\n" + + "5,3,IJK\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception { + /* + * CoGroup with multiple key fields, test that disabling closure cleaner leads to an exception when using inner + * classes. + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableClosureCleaner(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + boolean correctExceptionTriggered = false; + try { + DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2). + where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, + Tuple2<Integer, Long>>() { + @Override + public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception { + return new Tuple2<Integer, Long>(t.f0, t.f4); + } + }). + equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() { + + @Override + public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) { + return new Tuple2<Integer, Long>(t.f0, t.f1); + } + }). + with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() { + @Override + public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple3<Integer, Long, String>> second, + Collector<Tuple3<Integer, Long, String>> out) { + List<String> strs = new ArrayList<String>(); + + for (Tuple5<Integer, Long, Integer, String, Long> t : first) { + strs.add(t.f3); + } + + for (Tuple3<Integer, Long, String> t : second) { + for (String s : strs) { + out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s)); + } + } + } + }); + } catch (InvalidProgramException ex) { + correctExceptionTriggered = (ex.getCause() instanceof java.io.NotSerializableException); + } + Assert.assertTrue(correctExceptionTriggered); + + } + + private static class KeySelector7 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, + Tuple2<Integer, Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) { + return new Tuple2<Integer, Long>(t.f0, t.f4); + } + } + + private static class KeySelector8 implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) { + return new Tuple2<Integer, Long>(t.f0, t.f1); + } + } + + @Test + public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception { + /* + * CoGroup on two custom type inputs using expression keys + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup()); + + List<CustomType> result = coGroupDs.collect(); + + String expected = "1,0,test\n" + + "2,6,test\n" + + "3,24,test\n" + + "4,60,test\n" + + "5,120,test\n" + + "6,210,test\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws + Exception { + /* + * CoGroup on two custom type inputs using expression keys + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1()); + + List<CustomType> result = coGroupDs.collect(); + + String expected = "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + + compareResultAsText(result, expected); + } + + private static class CoGroup1 implements CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for (POJO p : first) { + for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } + } + } + } + + @Test + public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception { + /* + * CoGroup field-selector (expression keys) + key selector function + * The key selector is unnecessary complicated (Tuple1) ;) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where(new KeySelector6()).equalTo(6).with(new CoGroup3()); + + List<CustomType> result = coGroupDs.collect(); + + String expected = "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + + compareResultAsText(result, expected); + } + + private static class KeySelector6 implements KeySelector<POJO, Tuple1<Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple1<Long> getKey(POJO value) + throws Exception { + return new Tuple1<Long>(value.nestedPojo.longNumber); + } + } + + private static class CoGroup3 implements CoGroupFunction<POJO, Tuple7<Integer, + String, Integer, Integer, Long, String, Long>, CustomType> { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for (POJO p : first) { + for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } + } + } + } + + @Test + public void testCoGroupFieldSelectorAndKeySelector() throws Exception { + /* + * CoGroup field-selector (expression keys) + key selector function + * The key selector is simple here + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where(new KeySelector1()).equalTo(6).with(new CoGroup2()); + + List<CustomType> result = coGroupDs.collect(); + + String expected = "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + + compareResultAsText(result, expected); + } + + @Test + public void testCoGroupWithAtomicType1() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Integer> ds2 = env.fromElements(0, 1, 2); + + DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*").with(new CoGroupAtomic1()); + + List<Tuple3<Integer, Long, String>> result = coGroupDs.collect(); + + String expected = "(1,1,Hi)\n" + + "(2,2,Hello)"; + + compareResultAsText(result, expected); + } + + @Test + public void testCoGroupWithAtomicType2() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Integer> ds1 = env.fromElements(0, 1, 2); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); + + DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0).with(new CoGroupAtomic2()); + + List<Tuple3<Integer, Long, String>> result = coGroupDs.collect(); + + String expected = "(1,1,Hi)\n" + + "(2,2,Hello)"; + + compareResultAsText(result, expected); + } + + @Test + public void testCoGroupWithRangePartitioning() throws Exception { + /* + * Test coGroup on tuples with multiple key field positions and same customized distribution + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env); + + env.setParallelism(4); + TestDistribution testDis = new TestDistribution(); + DataSet<Tuple3<Integer, Long, String>> coGrouped = + DataSetUtils.partitionByRange(ds1, testDis, 0, 4) + .coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1)) + .where(0, 4) + .equalTo(0, 1) + .with(new Tuple5Tuple3CoGroup()); + + List<Tuple3<Integer, Long, String>> result = coGrouped.collect(); + + String expected = "1,1,Hallo\n" + + "2,2,Hallo Welt\n" + + "3,2,Hallo Welt wie gehts?\n" + + "3,2,ABC\n" + + "5,3,HIJ\n" + + "5,3,IJK\n"; + + compareResultAsTuples(result, expected); + } + + // -------------------------------------------------------------------------------------------- + // UDF classes + // -------------------------------------------------------------------------------------------- + + private static class KeySelector1 implements KeySelector<POJO, Long> { + private static final long serialVersionUID = 1L; + + @Override + public Long getKey(POJO value) + throws Exception { + return value.nestedPojo.longNumber; + } + } + + private static class CoGroup2 implements CoGroupFunction<POJO, Tuple7<Integer, String, + Integer, Integer, Long, String, Long>, CustomType> { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for (POJO p : first) { + for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } + } + } + } + + private static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, + Collector<Tuple2<Integer, Integer>> out) { + int sum = 0; + int id = 0; + + for (Tuple5<Integer, Long, Integer, String, Long> element : first) { + sum += element.f2; + id = element.f0; + } + + for (Tuple5<Integer, Long, Integer, String, Long> element : second) { + sum += element.f2; + id = element.f0; + } + + out.collect(new Tuple2<Integer, Integer>(id, sum)); + } + } + + private static class CustomTypeCoGroup implements CoGroupFunction<CustomType, CustomType, CustomType> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<CustomType> first, Iterable<CustomType> second, Collector<CustomType> out) { + + CustomType o = new CustomType(0, 0, "test"); + + for (CustomType element : first) { + o.myInt = element.myInt; + o.myLong += element.myLong; + } + + for (CustomType element : second) { + o.myInt = element.myInt; + o.myLong += element.myLong; + } + + out.collect(o); + } + } + + private static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<CustomType> second, + Collector<Tuple3<Integer, Long, String>> out) throws Exception { + + long sum = 0; + int id = 0; + + for (Tuple5<Integer, Long, Integer, String, Long> element : first) { + sum += element.f0; + id = element.f2; + } + + for (CustomType element : second) { + id = element.myInt; + sum += element.myLong; + } + + out.collect(new Tuple3<Integer, Long, String>(id, sum, "test")); + } + + } + + private static class MixedCoGroup2 implements CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<CustomType> first, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, + Collector<CustomType> out) { + CustomType o = new CustomType(0, 0, "test"); + + for (CustomType element : first) { + o.myInt = element.myInt; + o.myLong += element.myLong; + } + + for (Tuple5<Integer, Long, Integer, String, Long> element : second) { + o.myInt = element.f2; + o.myLong += element.f0; + } + + out.collect(o); + + } + + } + + private static class Tuple3ReturnLeft implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, + Iterable<Tuple3<Integer, Long, String>> second, + Collector<Tuple3<Integer, Long, String>> out) { + for (Tuple3<Integer, Long, String> element : first) { + if (element.f0 < 6) { + out.collect(element); + } + } + } + } + + private static class Tuple5ReturnRight implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, + Collector<Tuple5<Integer, Long, Integer, String, Long>> out) { + for (Tuple5<Integer, Long, Integer, String, Long> element : second) { + if (element.f0 < 4) { + out.collect(element); + } + } + } + } + + private static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> { + + private static final long serialVersionUID = 1L; + + private int broadcast = 42; + + @Override + public void open(Configuration config) { + + Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); + int sum = 0; + for (Integer i : ints) { + sum += i; + } + broadcast = sum; + + } + + @Override + public void coGroup( + Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, + Collector<Tuple3<Integer, Integer, Integer>> out) { + int sum = 0; + int id = 0; + + for (Tuple5<Integer, Long, Integer, String, Long> element : first) { + sum += element.f2; + id = element.f0; + } + + for (Tuple5<Integer, Long, Integer, String, Long> element : second) { + sum += element.f2; + id = element.f0; + } + + out.collect(new Tuple3<Integer, Integer, Integer>(id, sum, broadcast)); + } + } + + private static class Tuple5Tuple3CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, + Iterable<Tuple3<Integer, Long, String>> second, + Collector<Tuple3<Integer, Long, String>> out) { + List<String> strs = new ArrayList<String>(); + + for (Tuple5<Integer, Long, Integer, String, Long> t : first) { + strs.add(t.f3); + } + + for (Tuple3<Integer, Long, String> t : second) { + for (String s : strs) { + out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s)); + } + } + } + } + + private static class CoGroupAtomic1 implements CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, Iterable<Integer> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception { + List<Integer> ints = new ArrayList<Integer>(); + + for (Integer i : second) { + ints.add(i); + } + + for (Tuple3<Integer, Long, String> t : first) { + for (Integer i : ints) { + if (t.f0.equals(i)) { + out.collect(t); + } + } + } + } + } + + private static class CoGroupAtomic2 implements CoGroupFunction<Integer, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception { + List<Integer> ints = new ArrayList<Integer>(); + + for (Integer i : first) { + ints.add(i); + } + + for (Tuple3<Integer, Long, String> t : second) { + for (Integer i : ints) { + if (t.f0.equals(i)) { + out.collect(t); + } + } + } + } + } + + /** + * Test {@link DataDistribution}. + */ + public static class TestDistribution implements DataDistribution { + public Object[][] boundaries = new Object[][]{ + new Object[]{2, 2L}, + new Object[]{5, 4L}, + new Object[]{10, 12L}, + new Object[]{21, 6L} + }; + + public TestDistribution() {} + + @Override + public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) { + return boundaries[bucketNum]; + } + + @Override + public int getNumberOfFields() { + return 2; + } + + @Override + public TypeInformation[] getKeyTypes() { + return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO}; + } + + @Override + public void write(DataOutputView out) throws IOException { + + } + + @Override + public void read(DataInputView in) throws IOException { + + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestDistribution; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java new file mode 100644 index 0000000..6e61f60 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.operators; + +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.functions.RichCrossFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.operators.util.CollectionDataSets; +import org.apache.flink.test.operators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; +import java.util.List; + +/** + * Integration tests for {@link CrossFunction} and {@link RichCrossFunction}. + */ +@RunWith(Parameterized.class) +public class CrossITCase extends MultipleProgramsTestBase { + + public CrossITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception { + /* + * check correctness of cross on two tuple inputs + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross()); + + List<Tuple2<Integer, String>> result = crossDs.collect(); + + String expected = "0,HalloHallo\n" + + "1,HalloHallo Welt\n" + + "2,HalloHallo Welt wie\n" + + "1,Hallo WeltHallo\n" + + "2,Hallo WeltHallo Welt\n" + + "3,Hallo WeltHallo Welt wie\n" + + "2,Hallo Welt wieHallo\n" + + "3,Hallo Welt wieHallo Welt\n" + + "4,Hallo Welt wieHallo Welt wie\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws Exception { + /* + * check correctness of cross if UDF returns left input object + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft()); + + List<Tuple3<Integer, Long, String>> result = crossDs.collect(); + + String expected = "1,1,Hi\n" + + "1,1,Hi\n" + + "1,1,Hi\n" + + "2,2,Hello\n" + + "2,2,Hello\n" + + "2,2,Hello\n" + + "3,2,Hello world\n" + + "3,2,Hello world\n" + + "3,2,Hello world\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws Exception { + /* + * check correctness of cross if UDF returns right input object + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight()); + + List<Tuple5<Integer, Long, Integer, String, Long>> result = crossDs + .collect(); + + String expected = "1,1,0,Hallo,1\n" + + "1,1,0,Hallo,1\n" + + "1,1,0,Hallo,1\n" + + "2,2,1,Hallo Welt,2\n" + + "2,2,1,Hallo Welt,2\n" + + "2,2,1,Hallo Welt,2\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,3,2,Hallo Welt wie,1\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfCrossWithBroadcastSet() throws Exception { + /* + * check correctness of cross with broadcast set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints"); + + List<Tuple3<Integer, Integer, Integer>> result = crossDs.collect(); + + String expected = "2,0,55\n" + + "3,0,55\n" + + "3,0,55\n" + + "3,0,55\n" + + "4,1,55\n" + + "4,2,55\n" + + "3,0,55\n" + + "4,2,55\n" + + "4,4,55\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfCrossWithHuge() throws Exception { + /* + * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross()); + + List<Tuple2<Integer, String>> result = crossDs.collect(); + + String expected = "0,HalloHallo\n" + + "1,HalloHallo Welt\n" + + "2,HalloHallo Welt wie\n" + + "1,Hallo WeltHallo\n" + + "2,Hallo WeltHallo Welt\n" + + "3,Hallo WeltHallo Welt wie\n" + + "2,Hallo Welt wieHallo\n" + + "3,Hallo Welt wieHallo Welt\n" + + "4,Hallo Welt wieHallo Welt wie\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfCrossWithTiny() throws Exception { + /* + * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross()); + + List<Tuple2<Integer, String>> result = crossDs.collect(); + + String expected = "0,HalloHallo\n" + + "1,HalloHallo Welt\n" + + "2,HalloHallo Welt wie\n" + + "1,Hallo WeltHallo\n" + + "2,Hallo WeltHallo Welt\n" + + "3,Hallo WeltHallo Welt wie\n" + + "2,Hallo Welt wieHallo\n" + + "3,Hallo Welt wieHallo Welt\n" + + "4,Hallo Welt wieHallo Welt wie\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testProjectCrossOnATupleInput1() throws Exception{ + /* + * project cross on a tuple input 1 + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2) + .projectFirst(2, 1) + .projectSecond(3) + .projectFirst(0) + .projectSecond(4, 1); + + List<Tuple6<String, Long, String, Integer, Long, Long>> result = crossDs.collect(); + + String expected = "Hi,1,Hallo,1,1,1\n" + + "Hi,1,Hallo Welt,1,2,2\n" + + "Hi,1,Hallo Welt wie,1,1,3\n" + + "Hello,2,Hallo,2,1,1\n" + + "Hello,2,Hallo Welt,2,2,2\n" + + "Hello,2,Hallo Welt wie,2,1,3\n" + + "Hello world,2,Hallo,3,1,1\n" + + "Hello world,2,Hallo Welt,3,2,2\n" + + "Hello world,2,Hallo Welt wie,3,1,3\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testProjectCrossOnATupleInput2() throws Exception { + /* + * project cross on a tuple input 2 + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple6<String, String, Long, Long, Long, Integer>> crossDs = ds.cross(ds2) + .projectSecond(3) + .projectFirst(2, 1) + .projectSecond(4, 1) + .projectFirst(0); + + List<Tuple6<String, String, Long, Long, Long, Integer>> result = crossDs.collect(); + + String expected = "Hallo,Hi,1,1,1,1\n" + + "Hallo Welt,Hi,1,2,2,1\n" + + "Hallo Welt wie,Hi,1,1,3,1\n" + + "Hallo,Hello,2,1,1,2\n" + + "Hallo Welt,Hello,2,2,2,2\n" + + "Hallo Welt wie,Hello,2,1,3,2\n" + + "Hallo,Hello world,2,1,1,3\n" + + "Hallo Welt,Hello world,2,2,2,3\n" + + "Hallo Welt wie,Hello world,2,1,3,3\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfDefaultCross() throws Exception { + /* + * check correctness of default cross + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2); + + List<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> result = crossDs.collect(); + + String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" + + + "(1,1,Hi),(1,1,0,Hallo,1)\n" + + "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" + + "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" + + "(2,2,Hello),(1,1,0,Hallo,1)\n" + + "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" + + "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" + + "(3,2,Hello world),(1,1,0,Hallo,1)\n" + + "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n"; + + compareResultAsTuples(result, expected); + } + + @Test + public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws Exception { + /* + * check correctness of cross on two custom type inputs + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); + DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross()); + + List<CustomType> result = crossDs.collect(); + + String expected = "1,0,HiHi\n" + + "2,1,HiHello\n" + + "2,2,HiHello world\n" + + "2,1,HelloHi\n" + + "4,2,HelloHello\n" + + "4,3,HelloHello world\n" + + "2,2,Hello worldHi\n" + + "4,3,Hello worldHello\n" + + "4,4,Hello worldHello world"; + + compareResultAsText(result, expected); + } + + @Test + public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() throws Exception { + /* + * check correctness of cross a tuple input and a custom type input + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env); + DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross()); + + List<Tuple3<Integer, Long, String>> result = crossDs.collect(); + + String expected = "2,0,HalloHi\n" + + "3,0,HalloHello\n" + + "3,0,HalloHello world\n" + + "3,0,Hallo WeltHi\n" + + "4,1,Hallo WeltHello\n" + + "4,2,Hallo WeltHello world\n" + + "3,0,Hallo Welt wieHi\n" + + "4,2,Hallo Welt wieHello\n" + + "4,4,Hallo Welt wieHello world\n"; + + compareResultAsTuples(result, expected); + } + + private static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, String> cross( + Tuple5<Integer, Long, Integer, String, Long> first, + Tuple5<Integer, Long, Integer, String, Long> second) + throws Exception { + + return new Tuple2<Integer, String>(first.f2 + second.f2, first.f3 + second.f3); + } + + } + + private static class CustomTypeCross implements CrossFunction<CustomType, CustomType, CustomType> { + + private static final long serialVersionUID = 1L; + + @Override + public CustomType cross(CustomType first, CustomType second) + throws Exception { + + return new CustomType(first.myInt * second.myInt, first.myLong + second.myLong, first.myString + second.myString); + } + + } + + private static class MixedCross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public Tuple3<Integer, Long, String> cross( + Tuple5<Integer, Long, Integer, String, Long> first, + CustomType second) throws Exception { + + return new Tuple3<Integer, Long, String>(first.f0 + second.myInt, first.f2 * second.myLong, first.f3 + second.myString); + } + + } + + private static class Tuple3ReturnLeft implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public Tuple3<Integer, Long, String> cross( + Tuple3<Integer, Long, String> first, + Tuple5<Integer, Long, Integer, String, Long> second) throws Exception { + + return first; + } + } + + private static class Tuple5ReturnRight implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> { + + private static final long serialVersionUID = 1L; + + @Override + public Tuple5<Integer, Long, Integer, String, Long> cross( + Tuple3<Integer, Long, String> first, + Tuple5<Integer, Long, Integer, String, Long> second) + throws Exception { + + return second; + } + + } + + private static class Tuple5CrossBC extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> { + + private static final long serialVersionUID = 1L; + + private int broadcast = 42; + + @Override + public void open(Configuration config) { + + Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints"); + int sum = 0; + for (Integer i : ints) { + sum += i; + } + broadcast = sum; + + } + + @Override + public Tuple3<Integer, Integer, Integer> cross( + Tuple5<Integer, Long, Integer, String, Long> first, + Tuple5<Integer, Long, Integer, String, Long> second) + throws Exception { + + return new Tuple3<Integer, Integer, Integer>(first.f0 + second.f0, first.f2 * second.f2, broadcast); + } + } +}