Repository: incubator-flink Updated Branches: refs/heads/master ec82d973d -> 6be855543
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala index dd1ac99..fe1dd43 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala @@ -17,10 +17,10 @@ */ package org.apache.flink.api.scala.operators +import org.apache.flink.api.scala.util.CollectionDataSets.CustomType import org.junit.Assert import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.common.operators.Order -import org.junit.Ignore import org.junit.Test import org.apache.flink.api.scala._ @@ -96,7 +96,7 @@ class GroupingTest { } } - @Test(expected = classOf[UnsupportedOperationException]) + @Test(expected = classOf[IllegalArgumentException]) def testGroupByKeyFields2(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val longDs = env.fromCollection(emptyLongData) @@ -105,7 +105,7 @@ class GroupingTest { longDs.groupBy("_1") } - @Test(expected = classOf[UnsupportedOperationException]) + @Test(expected = classOf[IllegalArgumentException]) def testGroupByKeyFields3(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val customDs = env.fromCollection(customTypeData) @@ -114,7 +114,7 @@ class GroupingTest { customDs.groupBy("_1") } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[RuntimeException]) def testGroupByKeyFields4(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tupleDs = env.fromCollection(emptyTupleData) @@ -123,7 +123,15 @@ class GroupingTest { tupleDs.groupBy("foo") } - @Ignore + @Test + def testGroupByKeyFields5(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val customDs = env.fromCollection(customTypeData) + + // should not work + customDs.groupBy("myInt") + } + @Test def testGroupByKeyExpressions1(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -131,24 +139,22 @@ class GroupingTest { // should work try { -// ds.groupBy("i"); + ds.groupBy("myInt") } catch { case e: Exception => Assert.fail() } } - @Ignore - @Test(expected = classOf[UnsupportedOperationException]) + @Test(expected = classOf[IllegalArgumentException]) def testGroupByKeyExpressions2(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // should not work: groups on basic type -// longDs.groupBy("l"); val longDs = env.fromCollection(emptyLongData) + longDs.groupBy("l") } - @Ignore @Test(expected = classOf[InvalidProgramException]) def testGroupByKeyExpressions3(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -158,14 +164,13 @@ class GroupingTest { customDs.groupBy(0) } - @Ignore @Test(expected = classOf[IllegalArgumentException]) def testGroupByKeyExpressions4(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = env.fromCollection(customTypeData) // should not work, non-existent field -// ds.groupBy("myNonExistent"); + ds.groupBy("myNonExistent") } @Test @@ -173,7 +178,7 @@ class GroupingTest { val env = ExecutionEnvironment.getExecutionEnvironment try { val customDs = env.fromCollection(customTypeData) - customDs.groupBy { _.l } + customDs.groupBy { _.myLong } } catch { case e: Exception => Assert.fail() http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala index cae936d..0219154 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException +import org.apache.flink.api.scala.util.CollectionDataSets.CustomType import org.junit.Assert import org.apache.flink.api.common.InvalidProgramException import org.junit.Ignore @@ -132,7 +133,7 @@ class JoinOperatorTest { ds1.join(ds2).where("_1", "_2").equalTo("_3") } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[RuntimeException]) def testJoinKeyFields4(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds1 = env.fromCollection(emptyTupleData) @@ -142,7 +143,7 @@ class JoinOperatorTest { ds1.join(ds2).where("foo").equalTo("_1") } - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[RuntimeException]) def testJoinKeyFields5(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds1 = env.fromCollection(emptyTupleData) @@ -152,7 +153,7 @@ class JoinOperatorTest { ds1.join(ds2).where("_1").equalTo("bar") } - @Test(expected = classOf[UnsupportedOperationException]) + @Test(expected = classOf[IllegalArgumentException]) def testJoinKeyFields6(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds1 = env.fromCollection(emptyTupleData) @@ -162,7 +163,6 @@ class JoinOperatorTest { ds1.join(ds2).where("_2").equalTo("_1") } - @Ignore @Test def testJoinKeyExpressions1(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -171,36 +171,33 @@ class JoinOperatorTest { // should work try { -// ds1.join(ds2).where("i").equalTo("i") + ds1.join(ds2).where("myInt").equalTo("myInt") } catch { case e: Exception => Assert.fail() } } - @Ignore - @Test(expected = classOf[InvalidProgramException]) + @Test(expected = classOf[IncompatibleKeysException]) def testJoinKeyExpressions2(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds1 = env.fromCollection(customTypeData) val ds2 = env.fromCollection(customTypeData) // should not work, incompatible join key types -// ds1.join(ds2).where("i").equalTo("s") + ds1.join(ds2).where("myInt").equalTo("myString") } - @Ignore - @Test(expected = classOf[InvalidProgramException]) + @Test(expected = classOf[IncompatibleKeysException]) def testJoinKeyExpressions3(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds1 = env.fromCollection(customTypeData) val ds2 = env.fromCollection(customTypeData) // should not work, incompatible number of keys -// ds1.join(ds2).where("i", "s").equalTo("i") + ds1.join(ds2).where("myInt", "myString").equalTo("myInt") } - @Ignore @Test(expected = classOf[IllegalArgumentException]) def testJoinKeyExpressions4(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -208,7 +205,7 @@ class JoinOperatorTest { val ds2 = env.fromCollection(customTypeData) // should not work, join key non-existent -// ds1.join(ds2).where("myNonExistent").equalTo("i") + ds1.join(ds2).where("myNonExistent").equalTo("i") } @Test @@ -219,7 +216,7 @@ class JoinOperatorTest { // should work try { - ds1.join(ds2).where { _.l} equalTo { _.l } + ds1.join(ds2).where { _.myLong} equalTo { _.myLong } } catch { case e: Exception => Assert.fail() @@ -234,7 +231,7 @@ class JoinOperatorTest { // should work try { - ds1.join(ds2).where { _.l }.equalTo(3) + ds1.join(ds2).where { _.myLong }.equalTo(3) } catch { case e: Exception => Assert.fail() @@ -249,7 +246,7 @@ class JoinOperatorTest { // should work try { - ds1.join(ds2).where(3).equalTo { _.l } + ds1.join(ds2).where(3).equalTo { _.myLong } } catch { case e: Exception => Assert.fail() @@ -263,7 +260,7 @@ class JoinOperatorTest { val ds2 = env.fromCollection(customTypeData) // should not work, incompatible types - ds1.join(ds2).where(2).equalTo { _.l } + ds1.join(ds2).where(2).equalTo { _.myLong } } @Test(expected = classOf[IncompatibleKeysException]) @@ -273,7 +270,7 @@ class JoinOperatorTest { val ds2 = env.fromCollection(customTypeData) // should not work, more than one field position key - ds1.join(ds2).where(1, 3) equalTo { _.l } + ds1.join(ds2).where(1, 3) equalTo { _.myLong } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala index a8447a9..bea91df 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.scala.operators import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunction} import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.configuration.Configuration import org.apache.flink.test.util.JavaProgramTestBase import org.junit.runner.RunWith @@ -32,40 +33,18 @@ import org.apache.flink.api.scala._ object PartitionProgs { - var NUM_PROGRAMS: Int = 6 - - val tupleInput = Array( - (1, "Foo"), - (1, "Foo"), - (1, "Foo"), - (2, "Foo"), - (2, "Foo"), - (2, "Foo"), - (2, "Foo"), - (2, "Foo"), - (3, "Foo"), - (3, "Foo"), - (3, "Foo"), - (4, "Foo"), - (4, "Foo"), - (4, "Foo"), - (4, "Foo"), - (5, "Foo"), - (5, "Foo"), - (6, "Foo"), - (6, "Foo"), - (6, "Foo"), - (6, "Foo") - ) - + var NUM_PROGRAMS: Int = 7 def runProgram(progId: Int, resultPath: String, onCollection: Boolean): String = { progId match { case 1 => + /* + * Test hash partition by tuple field + */ val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromCollection(tupleInput) + val ds = CollectionDataSets.get3TupleDataSet(env) - val unique = ds.partitionByHash(0).mapPartition( _.map(_._1).toSet ) + val unique = ds.partitionByHash(1).mapPartition( _.map(_._2).toSet ) unique.writeAsText(resultPath) env.execute() @@ -73,16 +52,22 @@ object PartitionProgs { "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" case 2 => + /* + * Test hash partition by key selector + */ val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromCollection(tupleInput) - val unique = ds.partitionByHash( _._1 ).mapPartition( _.map(_._1).toSet ) + val ds = CollectionDataSets.get3TupleDataSet(env) + val unique = ds.partitionByHash( _._2 ).mapPartition( _.map(_._2).toSet ) unique.writeAsText(resultPath) env.execute() "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" case 3 => - val env = ExecutionEnvironment.getExecutionEnvironment + /* + * Test forced rebalancing + */ + val env = ExecutionEnvironment.getExecutionEnvironment val ds = env.generateSequence(1, 3000) val skewed = ds.filter(_ > 780) @@ -101,8 +86,8 @@ object PartitionProgs { countsInPartition.writeAsText(resultPath) env.execute() - val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10; - var result = ""; + val numPerPartition : Int = 2220 / env.getDegreeOfParallelism / 10 + var result = "" for (i <- 0 until env.getDegreeOfParallelism) { result += "(" + i + "," + numPerPartition + ")\n" } @@ -112,10 +97,12 @@ object PartitionProgs { // Verify that mapPartition operation after repartition picks up correct // DOP val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromCollection(tupleInput) + val ds = CollectionDataSets.get3TupleDataSet(env) env.setDegreeOfParallelism(1) - val unique = ds.partitionByHash(0).setParallelism(4).mapPartition( _.map(_._1).toSet ) + val unique = ds.partitionByHash(1) + .setParallelism(4) + .mapPartition( _.map(_._2).toSet ) unique.writeAsText(resultPath) env.execute() @@ -126,13 +113,13 @@ object PartitionProgs { // Verify that map operation after repartition picks up correct // DOP val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromCollection(tupleInput) + val ds = CollectionDataSets.get3TupleDataSet(env) env.setDegreeOfParallelism(1) val count = ds.partitionByHash(0).setParallelism(4).map( - new RichMapFunction[(Int, String), Tuple1[Int]] { + new RichMapFunction[(Int, Long, String), Tuple1[Int]] { var first = true - override def map(in: (Int, String)): Tuple1[Int] = { + override def map(in: (Int, Long, String)): Tuple1[Int] = { // only output one value with count 1 if (first) { first = false @@ -152,13 +139,13 @@ object PartitionProgs { // Verify that filter operation after repartition picks up correct // DOP val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromCollection(tupleInput) + val ds = CollectionDataSets.get3TupleDataSet(env) env.setDegreeOfParallelism(1) val count = ds.partitionByHash(0).setParallelism(4).filter( - new RichFilterFunction[(Int, String)] { + new RichFilterFunction[(Int, Long, String)] { var first = true - override def filter(in: (Int, String)): Boolean = { + override def filter(in: (Int, Long, String)): Boolean = { // only output one value with count 1 if (first) { first = false @@ -175,6 +162,19 @@ object PartitionProgs { if (onCollection) "(1)\n" else "(4)\n" + case 7 => + val env = ExecutionEnvironment.getExecutionEnvironment + env.setDegreeOfParallelism(3) + val ds = CollectionDataSets.getDuplicatePojoDataSet(env) + val uniqLongs = ds + .partitionByHash("nestedPojo.longNumber") + .setParallelism(4) + .mapPartition( _.map(_.nestedPojo.longNumber).toSet ) + + uniqLongs.writeAsText(resultPath) + env.execute() + "10000\n" + "20000\n" + "30000\n" + case _ => throw new IllegalArgumentException("Invalid program id") } @@ -194,7 +194,7 @@ class PartitionITCase(config: Configuration) extends JavaProgramTestBase(config) } protected def testProgram(): Unit = { - expectedResult = PartitionProgs.runProgram(curProgId, resultPath, isCollectionExecution) + expectedResult = GroupReduceProgs.runProgram(curProgId, resultPath, isCollectionExecution) } protected override def postSubmit(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala index d33da41..2b2d3a9 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala @@ -44,7 +44,9 @@ class CustomType(var myField1: String, var myField2: Int) { } } -class MyObject[A](var a: A) +class MyObject[A](var a: A) { + def this() { this(null.asInstanceOf[A]) } +} class TypeInformationGenTest { @@ -139,7 +141,7 @@ class TypeInformationGenTest { Assert.assertFalse(ti.isBasicType) Assert.assertFalse(ti.isTupleType) - Assert.assertTrue(ti.isInstanceOf[GenericTypeInfo[_]]) + Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]]) Assert.assertEquals(ti.getTypeClass, classOf[CustomType]) } @@ -152,7 +154,7 @@ class TypeInformationGenTest { val tti = ti.asInstanceOf[TupleTypeInfoBase[_]] Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeClass) Assert.assertEquals(classOf[java.lang.Long], tti.getTypeAt(0).getTypeClass) - Assert.assertTrue(tti.getTypeAt(1).isInstanceOf[GenericTypeInfo[_]]) + Assert.assertTrue(tti.getTypeAt(1).isInstanceOf[PojoTypeInfo[_]]) Assert.assertEquals(classOf[CustomType], tti.getTypeAt(1).getTypeClass) } @@ -235,7 +237,7 @@ class TypeInformationGenTest { def testParamertizedCustomObject(): Unit = { val ti = createTypeInformation[MyObject[String]] - Assert.assertTrue(ti.isInstanceOf[GenericTypeInfo[_]]) + Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]]) } @Test http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6be85554/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala new file mode 100644 index 0000000..60f86a0 --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.util + +import org.apache.hadoop.io.IntWritable + +import org.apache.flink.api.scala._ + +import scala.collection.mutable +import scala.util.Random + +/** + * ################################################################################################# + * + * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. + * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING! + * + * ################################################################################################# + */ +object CollectionDataSets { + def get3TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = { + val data = new mutable.MutableList[(Int, Long, String)] + data.+=((1, 1L, "Hi")) + data.+=((2, 2L, "Hello")) + data.+=((3, 2L, "Hello world")) + data.+=((4, 3L, "Hello world, how are you?")) + data.+=((5, 3L, "I am fine.")) + data.+=((6, 3L, "Luke Skywalker")) + data.+=((7, 4L, "Comment#1")) + data.+=((8, 4L, "Comment#2")) + data.+=((9, 4L, "Comment#3")) + data.+=((10, 4L, "Comment#4")) + data.+=((11, 5L, "Comment#5")) + data.+=((12, 5L, "Comment#6")) + data.+=((13, 5L, "Comment#7")) + data.+=((14, 5L, "Comment#8")) + data.+=((15, 5L, "Comment#9")) + data.+=((16, 6L, "Comment#10")) + data.+=((17, 6L, "Comment#11")) + data.+=((18, 6L, "Comment#12")) + data.+=((19, 6L, "Comment#13")) + data.+=((20, 6L, "Comment#14")) + data.+=((21, 6L, "Comment#15")) + Random.shuffle(data) + env.fromCollection(Random.shuffle(data)) + } + + def getSmall3TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = { + val data = new mutable.MutableList[(Int, Long, String)] + data.+=((1, 1L, "Hi")) + data.+=((2, 2L, "Hello")) + data.+=((3, 2L, "Hello world")) + env.fromCollection(Random.shuffle(data)) + } + + def get5TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, Int, String, Long)] = { + val data = new mutable.MutableList[(Int, Long, Int, String, Long)] + data.+=((1, 1L, 0, "Hallo", 1L)) + data.+=((2, 2L, 1, "Hallo Welt", 2L)) + data.+=((2, 3L, 2, "Hallo Welt wie", 1L)) + data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L)) + data.+=((3, 5L, 4, "ABC", 2L)) + data.+=((3, 6L, 5, "BCD", 3L)) + data.+=((4, 7L, 6, "CDE", 2L)) + data.+=((4, 8L, 7, "DEF", 1L)) + data.+=((4, 9L, 8, "EFG", 1L)) + data.+=((4, 10L, 9, "FGH", 2L)) + data.+=((5, 11L, 10, "GHI", 1L)) + data.+=((5, 12L, 11, "HIJ", 3L)) + data.+=((5, 13L, 12, "IJK", 3L)) + data.+=((5, 14L, 13, "JKL", 2L)) + data.+=((5, 15L, 14, "KLM", 2L)) + env.fromCollection(Random.shuffle(data)) + } + + def getSmall5TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, Int, String, Long)] = { + val data = new mutable.MutableList[(Int, Long, Int, String, Long)] + data.+=((1, 1L, 0, "Hallo", 1L)) + data.+=((2, 2L, 1, "Hallo Welt", 2L)) + data.+=((2, 3L, 2, "Hallo Welt wie", 1L)) + env.fromCollection(Random.shuffle(data)) + } + + def getSmallNestedTupleDataSet(env: ExecutionEnvironment): DataSet[((Int, Int), String)] = { + val data = new mutable.MutableList[((Int, Int), String)] + data.+=(((1, 1), "one")) + data.+=(((2, 2), "two")) + data.+=(((3, 3), "three")) + env.fromCollection(Random.shuffle(data)) + } + + def getGroupSortedNestedTupleDataSet(env: ExecutionEnvironment): DataSet[((Int, Int), String)] = { + val data = new mutable.MutableList[((Int, Int), String)] + data.+=(((1, 3), "a")) + data.+=(((1, 2), "a")) + data.+=(((2, 1), "a")) + data.+=(((2, 2), "b")) + data.+=(((3, 3), "c")) + data.+=(((3, 6), "c")) + data.+=(((4, 9), "c")) + env.fromCollection(Random.shuffle(data)) + } + + def getStringDataSet(env: ExecutionEnvironment): DataSet[String] = { + val data = new mutable.MutableList[String] + data.+=("Hi") + data.+=("Hello") + data.+=("Hello world") + data.+=("Hello world, how are you?") + data.+=("I am fine.") + data.+=("Luke Skywalker") + data.+=("Random comment") + data.+=("LOL") + env.fromCollection(Random.shuffle(data)) + } + + def getIntDataSet(env: ExecutionEnvironment): DataSet[Int] = { + val data = new mutable.MutableList[Int] + data.+=(1) + data.+=(2) + data.+=(2) + data.+=(3) + data.+=(3) + data.+=(3) + data.+=(4) + data.+=(4) + data.+=(4) + data.+=(4) + data.+=(5) + data.+=(5) + data.+=(5) + data.+=(5) + data.+=(5) + env.fromCollection(Random.shuffle(data)) + } + + def getCustomTypeDataSet(env: ExecutionEnvironment): DataSet[CustomType] = { + val data = new mutable.MutableList[CustomType] + data.+=(new CustomType(1, 0L, "Hi")) + data.+=(new CustomType(2, 1L, "Hello")) + data.+=(new CustomType(2, 2L, "Hello world")) + data.+=(new CustomType(3, 3L, "Hello world, how are you?")) + data.+=(new CustomType(3, 4L, "I am fine.")) + data.+=(new CustomType(3, 5L, "Luke Skywalker")) + data.+=(new CustomType(4, 6L, "Comment#1")) + data.+=(new CustomType(4, 7L, "Comment#2")) + data.+=(new CustomType(4, 8L, "Comment#3")) + data.+=(new CustomType(4, 9L, "Comment#4")) + data.+=(new CustomType(5, 10L, "Comment#5")) + data.+=(new CustomType(5, 11L, "Comment#6")) + data.+=(new CustomType(5, 12L, "Comment#7")) + data.+=(new CustomType(5, 13L, "Comment#8")) + data.+=(new CustomType(5, 14L, "Comment#9")) + data.+=(new CustomType(6, 15L, "Comment#10")) + data.+=(new CustomType(6, 16L, "Comment#11")) + data.+=(new CustomType(6, 17L, "Comment#12")) + data.+=(new CustomType(6, 18L, "Comment#13")) + data.+=(new CustomType(6, 19L, "Comment#14")) + data.+=(new CustomType(6, 20L, "Comment#15")) + env.fromCollection(Random.shuffle(data)) + } + + def getSmallCustomTypeDataSet(env: ExecutionEnvironment): DataSet[CustomType] = { + val data = new mutable.MutableList[CustomType] + data.+=(new CustomType(1, 0L, "Hi")) + data.+=(new CustomType(2, 1L, "Hello")) + data.+=(new CustomType(2, 2L, "Hello world")) + env.fromCollection(Random.shuffle(data)) + } + + def getSmallTuplebasedPojoMatchingDataSet(env: ExecutionEnvironment): + DataSet[(Int, String, Int, Int, Long, String, Long)] = { + val data = new mutable.MutableList[(Int, String, Int, Int, Long, String, Long)] + data.+=((1, "First", 10, 100, 1000L, "One", 10000L)) + data.+=((2, "Second", 20, 200, 2000L, "Two", 20000L)) + data.+=((3, "Third", 30, 300, 3000L, "Three", 30000L)) + env.fromCollection(Random.shuffle(data)) + } + + def getSmallPojoDataSet(env: ExecutionEnvironment): DataSet[POJO] = { + val data = new mutable.MutableList[POJO] + data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)) + data.+=(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)) + data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)) + env.fromCollection(Random.shuffle(data)) + } + + def getDuplicatePojoDataSet(env: ExecutionEnvironment): DataSet[POJO] = { + val data = new mutable.MutableList[POJO] + data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)) + data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)) + data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)) + data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)) + data.+=(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)) + data.+=(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)) + data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)) + data.+=(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)) + env.fromCollection(data) + } + + def getCrazyNestedDataSet(env: ExecutionEnvironment): DataSet[CrazyNested] = { + val data = new mutable.MutableList[CrazyNested] + data.+=(new CrazyNested("aa")) + data.+=(new CrazyNested("bb")) + data.+=(new CrazyNested("bb")) + data.+=(new CrazyNested("cc")) + data.+=(new CrazyNested("cc")) + data.+=(new CrazyNested("cc")) + env.fromCollection(data) + } + + def getPojoContainingTupleAndWritable(env: ExecutionEnvironment): DataSet[CollectionDataSets + .PojoContainingTupleAndWritable] = { + val data = new + mutable.MutableList[PojoContainingTupleAndWritable] + data.+=(new PojoContainingTupleAndWritable(1, 10L, 100L)) + data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L)) + data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L)) + data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L)) + data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L)) + data.+=(new PojoContainingTupleAndWritable(2, 20L, 200L)) + env.fromCollection(data) + } + + def getTupleContainingPojos(env: ExecutionEnvironment): DataSet[(Int, CrazyNested, POJO)] = { + val data = new mutable.MutableList[(Int, CrazyNested, POJO)] + data.+=(( + 1, + new CrazyNested("one", "uno", 1L), + new POJO(1, "First", 10, 100, 1000L, "One", 10000L))) + data.+=(( + 1, + new CrazyNested("one", "uno", 1L), + new POJO(1, "First", 10, 100, 1000L, "One", 10000L))) + data.+=(( + 1, + new CrazyNested("one", "uno", 1L), + new POJO(1, "First", 10, 100, 1000L, "One", 10000L))) + data.+=(( + 2, + new CrazyNested("two", "duo", 2L), + new POJO(1, "First", 10, 100, 1000L, "One", 10000L))) + env.fromCollection(data) + } + + def getPojoWithMultiplePojos(env: ExecutionEnvironment): DataSet[CollectionDataSets + .PojoWithMultiplePojos] = { + val data = new mutable.MutableList[CollectionDataSets + .PojoWithMultiplePojos] + data.+=(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1)) + data.+=(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)) + data.+=(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3)) + env.fromCollection(data) + } + + + class CustomType(var myInt: Int, var myLong: Long, var myString: String) { + def this() { + this(0, 0, "") + } + + override def toString: String = { + myInt + "," + myLong + "," + myString + } + } + + class POJO( + var number: Int, + var str: String, + var nestedTupleWithCustom: (Int, CustomType), + var nestedPojo: NestedPojo) { + def this() { + this(0, "", null, null) + } + + def this(i0: Int, s0: String, i1: Int, i2: Int, l0: Long, s1: String, l1: Long) { + this(i0, s0, (i1, new CustomType(i2, l0, s1)), new NestedPojo(l1)) + } + + override def toString: String = { + number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber + } + + @transient var ignoreMe: Long = 1L + } + + class NestedPojo(var longNumber: Long) { + def this() { + this(0) + } + } + + class CrazyNested(var nest_Lvl1: CrazyNestedL1, var something: Long) { + def this() { + this(new CrazyNestedL1, 0) + } + + def this(set: String) { + this() + nest_Lvl1 = new CrazyNestedL1 + nest_Lvl1.nest_Lvl2 = new CrazyNestedL2 + nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3 + nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4 + nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set + } + + def this(set: String, second: String, s: Long) { + this(set) + something = s + nest_Lvl1.a = second + } + } + + class CrazyNestedL1 { + var a: String = null + var b: Int = 0 + var nest_Lvl2: CrazyNestedL2 = null + } + + class CrazyNestedL2 { + var nest_Lvl3: CrazyNestedL3 = null + } + + class CrazyNestedL3 { + var nest_Lvl4: CrazyNestedL4 = null + } + + class CrazyNestedL4 { + var f1nal: String = null + } + + class PojoContainingTupleAndWritable( + var someInt: Int, + var someString: String, + var hadoopFan: IntWritable, + var theTuple: (Long, Long)) { + def this() { + this(0, "", new IntWritable(0), (0, 0)) + } + + def this(i: Int, l1: Long, l2: Long) { + this() + hadoopFan = new IntWritable(i) + someInt = i + theTuple = (l1, l2) + } + + } + + class Pojo1 { + var a: String = null + var b: String = null + } + + class Pojo2 { + var a2: String = null + var b2: String = null + } + + class PojoWithMultiplePojos { + + def this(a: String, b: String, a1: String, b1: String, i0: Int) { + this() + p1 = new Pojo1 + p1.a = a + p1.b = b + p2 = new Pojo2 + p2.a2 = a1 + p2.a2 = b1 + this.i0 = i0 + } + + var p1: Pojo1 = null + var p2: Pojo2 = null + var i0: Int = 0 + } + +} +
