http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java deleted file mode 100644 index ba48e12..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ /dev/null @@ -1,725 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.javaApiOperators.util; - -import java.io.File; -import java.io.Serializable; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.hadoop.io.IntWritable; - -import scala.math.BigInt; - -/** - * ####################################################################################################### - * - * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. - * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING! - * - * ####################################################################################################### - */ -public class CollectionDataSets { - - public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) { - - List<Tuple3<Integer, Long, String>> data = new ArrayList<>(); - data.add(new Tuple3<>(1, 1L, "Hi")); - data.add(new Tuple3<>(2, 2L, "Hello")); - data.add(new Tuple3<>(3, 2L, "Hello world")); - data.add(new Tuple3<>(4, 3L, "Hello world, how are you?")); - data.add(new Tuple3<>(5, 3L, "I am fine.")); - data.add(new Tuple3<>(6, 3L, "Luke Skywalker")); - data.add(new Tuple3<>(7, 4L, "Comment#1")); - data.add(new Tuple3<>(8, 4L, "Comment#2")); - data.add(new Tuple3<>(9, 4L, "Comment#3")); - data.add(new Tuple3<>(10, 4L, "Comment#4")); - data.add(new Tuple3<>(11, 5L, "Comment#5")); - data.add(new Tuple3<>(12, 5L, "Comment#6")); - data.add(new Tuple3<>(13, 5L, "Comment#7")); - data.add(new Tuple3<>(14, 5L, "Comment#8")); - data.add(new Tuple3<>(15, 5L, "Comment#9")); - data.add(new Tuple3<>(16, 6L, "Comment#10")); - data.add(new Tuple3<>(17, 6L, "Comment#11")); - data.add(new Tuple3<>(18, 6L, "Comment#12")); - data.add(new Tuple3<>(19, 6L, "Comment#13")); - data.add(new Tuple3<>(20, 6L, "Comment#14")); - data.add(new Tuple3<>(21, 6L, "Comment#15")); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env) { - - List<Tuple3<Integer, Long, String>> data = new ArrayList<>(); - data.add(new Tuple3<>(1, 1L, "Hi")); - data.add(new Tuple3<>(2, 2L, "Hello")); - data.add(new Tuple3<>(3, 2L, "Hello world")); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) { - - List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>(); - data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L)); - data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L)); - data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L)); - data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L)); - data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L)); - data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L)); - data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L)); - data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L)); - data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L)); - data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L)); - data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L)); - data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L)); - data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L)); - data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L)); - data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L)); - - Collections.shuffle(data); - - TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new TupleTypeInfo<>( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> getSmall5TupleDataSet(ExecutionEnvironment env) { - - List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>(); - data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L)); - data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L)); - data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L)); - - Collections.shuffle(data); - - TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new TupleTypeInfo<>( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getSmallNestedTupleDataSet(ExecutionEnvironment env) { - - List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<>(); - data.add(new Tuple2<>(new Tuple2<>(1, 1), "one")); - data.add(new Tuple2<>(new Tuple2<>(2, 2), "two")); - data.add(new Tuple2<>(new Tuple2<>(3, 3), "three")); - - TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new TupleTypeInfo<>( - new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), - BasicTypeInfo.STRING_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) { - - List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<>(); - data.add(new Tuple2<>(new Tuple2<>(1, 3), "a")); - data.add(new Tuple2<>(new Tuple2<>(1, 2), "a")); - data.add(new Tuple2<>(new Tuple2<>(2, 1), "a")); - data.add(new Tuple2<>(new Tuple2<>(2, 2), "b")); - data.add(new Tuple2<>(new Tuple2<>(3, 3), "c")); - data.add(new Tuple2<>(new Tuple2<>(3, 6), "c")); - data.add(new Tuple2<>(new Tuple2<>(4, 9), "c")); - - TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new TupleTypeInfo<>( - new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), - BasicTypeInfo.STRING_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) { - - List<Tuple3<Tuple2<Integer, Integer>, String, Integer>> data = new ArrayList<>(); - data.add(new Tuple3<>(new Tuple2<>(1, 3), "a", 2)); - data.add(new Tuple3<>(new Tuple2<>(1, 2), "a", 1)); - data.add(new Tuple3<>(new Tuple2<>(2, 1), "a", 3)); - data.add(new Tuple3<>(new Tuple2<>(2, 2), "b", 4)); - data.add(new Tuple3<>(new Tuple2<>(3, 3), "c", 5)); - data.add(new Tuple3<>(new Tuple2<>(3, 6), "c", 6)); - data.add(new Tuple3<>(new Tuple2<>(4, 9), "c", 7)); - - TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>> type = new TupleTypeInfo<>( - new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) { - List<Tuple2<byte[], Integer>> data = new ArrayList<>(); - data.add(new Tuple2<>(new byte[]{0, 4}, 1)); - data.add(new Tuple2<>(new byte[]{2, 0}, 1)); - data.add(new Tuple2<>(new byte[]{2, 0, 4}, 4)); - data.add(new Tuple2<>(new byte[]{2, 1}, 3)); - data.add(new Tuple2<>(new byte[]{0}, 0)); - data.add(new Tuple2<>(new byte[]{2, 0}, 1)); - - TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<>( - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<String> getStringDataSet(ExecutionEnvironment env) { - - List<String> data = new ArrayList<>(); - data.add("Hi"); - data.add("Hello"); - data.add("Hello world"); - data.add("Hello world, how are you?"); - data.add("I am fine."); - data.add("Luke Skywalker"); - data.add("Random comment"); - data.add("LOL"); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env) { - - List<Integer> data = new ArrayList<>(); - data.add(1); - data.add(2); - data.add(2); - data.add(3); - data.add(3); - data.add(3); - data.add(4); - data.add(4); - data.add(4); - data.add(4); - data.add(5); - data.add(5); - data.add(5); - data.add(5); - data.add(5); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) { - - List<CustomType> data = new ArrayList<>(); - data.add(new CustomType(1, 0L, "Hi")); - data.add(new CustomType(2, 1L, "Hello")); - data.add(new CustomType(2, 2L, "Hello world")); - data.add(new CustomType(3, 3L, "Hello world, how are you?")); - data.add(new CustomType(3, 4L, "I am fine.")); - data.add(new CustomType(3, 5L, "Luke Skywalker")); - data.add(new CustomType(4, 6L, "Comment#1")); - data.add(new CustomType(4, 7L, "Comment#2")); - data.add(new CustomType(4, 8L, "Comment#3")); - data.add(new CustomType(4, 9L, "Comment#4")); - data.add(new CustomType(5, 10L, "Comment#5")); - data.add(new CustomType(5, 11L, "Comment#6")); - data.add(new CustomType(5, 12L, "Comment#7")); - data.add(new CustomType(5, 13L, "Comment#8")); - data.add(new CustomType(5, 14L, "Comment#9")); - data.add(new CustomType(6, 15L, "Comment#10")); - data.add(new CustomType(6, 16L, "Comment#11")); - data.add(new CustomType(6, 17L, "Comment#12")); - data.add(new CustomType(6, 18L, "Comment#13")); - data.add(new CustomType(6, 19L, "Comment#14")); - data.add(new CustomType(6, 20L, "Comment#15")); - - Collections.shuffle(data); - - return env.fromCollection(data); - - } - - public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) { - - List<CustomType> data = new ArrayList<>(); - data.add(new CustomType(1, 0L, "Hi")); - data.add(new CustomType(2, 1L, "Hello")); - data.add(new CustomType(2, 2L, "Hello world")); - - Collections.shuffle(data); - - return env.fromCollection(data); - - } - - public static class CustomType implements Serializable { - - private static final long serialVersionUID = 1L; - - public int myInt; - public long myLong; - public String myString; - - public CustomType() { - } - - public CustomType(int i, long l, String s) { - myInt = i; - myLong = l; - myString = s; - } - - @Override - public String toString() { - return myInt + "," + myLong + "," + myString; - } - } - - public static class CustomTypeComparator implements Comparator<CustomType> { - @Override - public int compare(CustomType o1, CustomType o2) { - int diff = o1.myInt - o2.myInt; - if (diff != 0) { - return diff; - } - diff = (int) (o1.myLong - o2.myLong); - return diff != 0 ? diff : o1.myString.compareTo(o2.myString); - } - - } - - public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env) { - List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data = new ArrayList<>(); - data.add(new Tuple7<>(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new Tuple7<>(2, "Second", 20, 200, 2000L, "Two", 20000L)); - data.add(new Tuple7<>(3, "Third", 30, 300, 3000L, "Three", 30000L)); - return env.fromCollection(data); - } - - public static DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) { - List<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> data = new ArrayList<>(); - data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, "First")); - data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, "Second")); - data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, "Third")); - - return env.fromCollection(data); - } - - public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) { - List<POJO> data = new ArrayList<>(); - data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/)); - data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)); - data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); - return env.fromCollection(data); - } - - public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) { - List<POJO> data = new ArrayList<>(); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)); - data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x - data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); - return env.fromCollection(data); - } - - public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) { - List<POJO> data = new ArrayList<>(); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x - data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L)); - data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L)); - data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L)); - data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L)); - data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L)); - data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x - data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L)); - return env.fromCollection(data); - } - - public static class POJO { - public int number; - public String str; - public Tuple2<Integer, CustomType> nestedTupleWithCustom; - public NestedPojo nestedPojo; - public transient Long ignoreMe; - - public POJO(int i0, String s0, - int i1, int i2, long l0, String s1, - long l1) { - this.number = i0; - this.str = s0; - this.nestedTupleWithCustom = new Tuple2<>(i1, new CustomType(i2, l0, s1)); - this.nestedPojo = new NestedPojo(); - this.nestedPojo.longNumber = l1; - } - - public POJO() { - } - - @Override - public String toString() { - return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber; - } - } - - public static class NestedPojo { - public static Object ignoreMe; - public long longNumber; - - public NestedPojo() { - } - } - - public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) { - List<CrazyNested> data = new ArrayList<>(); - data.add(new CrazyNested("aa")); - data.add(new CrazyNested("bb")); - data.add(new CrazyNested("bb")); - data.add(new CrazyNested("cc")); - data.add(new CrazyNested("cc")); - data.add(new CrazyNested("cc")); - return env.fromCollection(data); - } - - public static class CrazyNested { - public CrazyNestedL1 nest_Lvl1; - public Long something; // test proper null-value handling - - public CrazyNested() { - } - - public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values - this(set); - something = s; - nest_Lvl1.a = second; - } - - public CrazyNested(String set) { - nest_Lvl1 = new CrazyNestedL1(); - nest_Lvl1.nest_Lvl2 = new CrazyNestedL2(); - nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3(); - nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4(); - nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set; - } - } - - public static class CrazyNestedL1 { - public String a; - public int b; - public CrazyNestedL2 nest_Lvl2; - } - - public static class CrazyNestedL2 { - public CrazyNestedL3 nest_Lvl3; - } - - public static class CrazyNestedL3 { - public CrazyNestedL4 nest_Lvl4; - } - - public static class CrazyNestedL4 { - public String f1nal; - } - - // Copied from TypeExtractorTest - public static class FromTuple extends Tuple3<String, String, Long> { - private static final long serialVersionUID = 1L; - public int special; - } - - public static class FromTupleWithCTor extends FromTuple { - - private static final long serialVersionUID = 1L; - - public FromTupleWithCTor() {} - - public FromTupleWithCTor(int special, long tupleField) { - this.special = special; - this.setField(tupleField, 2); - } - } - - public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) { - List<FromTupleWithCTor> data = new ArrayList<>(); - data.add(new FromTupleWithCTor(1, 10L)); // 3x - data.add(new FromTupleWithCTor(1, 10L)); - data.add(new FromTupleWithCTor(1, 10L)); - data.add(new FromTupleWithCTor(2, 20L)); // 2x - data.add(new FromTupleWithCTor(2, 20L)); - return env.fromCollection(data); - } - - public static class PojoContainingTupleAndWritable { - public int someInt; - public String someString; - public IntWritable hadoopFan; - public Tuple2<Long, Long> theTuple; - - public PojoContainingTupleAndWritable() { - } - - public PojoContainingTupleAndWritable(int i, long l1, long l2) { - hadoopFan = new IntWritable(i); - someInt = i; - theTuple = new Tuple2<>(l1, l2); - } - } - - public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) { - List<PojoContainingTupleAndWritable> data = new ArrayList<>(); - data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); - return env.fromCollection(data); - } - - - - public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) { - List<PojoContainingTupleAndWritable> data = new ArrayList<>(); - data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x - data.add(new PojoContainingTupleAndWritable(2, 20L, 201L)); - data.add(new PojoContainingTupleAndWritable(2, 30L, 200L)); - data.add(new PojoContainingTupleAndWritable(2, 30L, 600L)); - data.add(new PojoContainingTupleAndWritable(2, 30L, 400L)); - return env.fromCollection(data); - } - - public static DataSet<Tuple3<Integer, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) { - List<Tuple3<Integer, CrazyNested, POJO>> data = new ArrayList<>(); - data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x - data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); - data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); - // POJO is not initialized according to the first two fields. - data.add(new Tuple3<>(2, new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x - return env.fromCollection(data); - } - - public static class Pojo1 { - public String a; - public String b; - - public Pojo1() {} - - public Pojo1(String a, String b) { - this.a = a; - this.b = b; - } - } - - public static class Pojo2 { - public String a2; - public String b2; - } - - public static class PojoWithMultiplePojos { - public Pojo1 p1; - public Pojo2 p2; - public Integer i0; - - public PojoWithMultiplePojos() { - } - - public PojoWithMultiplePojos(String a, String b, String a1, String b1, Integer i0) { - p1 = new Pojo1(); - p1.a = a; - p1.b = b; - p2 = new Pojo2(); - p2.a2 = a1; - p2.b2 = b1; - this.i0 = i0; - } - } - - public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) { - List<PojoWithMultiplePojos> data = new ArrayList<>(); - data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1)); - data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); - data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); - data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); - data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3)); - data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3)); - return env.fromCollection(data); - } - - public enum Category { - CAT_A, CAT_B - } - - public static class PojoWithDateAndEnum { - public String group; - public Date date; - public Category cat; - } - - public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) { - List<PojoWithDateAndEnum> data = new ArrayList<>(); - - PojoWithDateAndEnum one = new PojoWithDateAndEnum(); - one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A; - data.add(one); - - PojoWithDateAndEnum two = new PojoWithDateAndEnum(); - two.group = "a"; two.date = new Date(666); two.cat = Category.CAT_A; - data.add(two); - - PojoWithDateAndEnum three = new PojoWithDateAndEnum(); - three.group = "b"; three.date = new Date(666); three.cat = Category.CAT_B; - data.add(three); - - return env.fromCollection(data); - } - - public static class PojoWithCollection { - public List<Pojo1> pojos; - public int key; - public java.sql.Date sqlDate; - public BigInteger bigInt; - public BigDecimal bigDecimalKeepItNull; - public BigInt scalaBigInt; - public List<Object> mixed; - - @Override - public String toString() { - return "PojoWithCollection{" + - "pojos.size()=" + pojos.size() + - ", key=" + key + - ", sqlDate=" + sqlDate + - ", bigInt=" + bigInt + - ", bigDecimalKeepItNull=" + bigDecimalKeepItNull + - ", scalaBigInt=" + scalaBigInt + - ", mixed=" + mixed + - '}'; - } - } - - public static class PojoWithCollectionGeneric { - public List<Pojo1> pojos; - public int key; - public java.sql.Date sqlDate; - public BigInteger bigInt; - public BigDecimal bigDecimalKeepItNull; - public BigInt scalaBigInt; - public List<Object> mixed; - private PojoWithDateAndEnum makeMeGeneric; - - @Override - public String toString() { - return "PojoWithCollection{" + - "pojos.size()=" + pojos.size() + - ", key=" + key + - ", sqlDate=" + sqlDate + - ", bigInt=" + bigInt + - ", bigDecimalKeepItNull=" + bigDecimalKeepItNull + - ", scalaBigInt=" + scalaBigInt + - ", mixed=" + mixed + - '}'; - } - } - - public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) { - List<PojoWithCollection> data = new ArrayList<>(); - - List<Pojo1> pojosList1 = new ArrayList<>(); - pojosList1.add(new Pojo1("a", "aa")); - pojosList1.add(new Pojo1("b", "bb")); - - List<Pojo1> pojosList2 = new ArrayList<>(); - pojosList2.add(new Pojo1("a2", "aa2")); - pojosList2.add(new Pojo1("b2", "bb2")); - - PojoWithCollection pwc1 = new PojoWithCollection(); - pwc1.pojos = pojosList1; - pwc1.key = 0; - pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); - pwc1.scalaBigInt = BigInt.int2bigInt(10); - pwc1.bigDecimalKeepItNull = null; - - // use calendar to make it stable across time zones - GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18); - pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis()); - pwc1.mixed = new ArrayList<>(); - Map<String, Integer> map = new HashMap<>(); - map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3); - pwc1.mixed.add(map); - pwc1.mixed.add(new File("/this/is/wrong")); - pwc1.mixed.add("uhlala"); - - PojoWithCollection pwc2 = new PojoWithCollection(); - pwc2.pojos = pojosList2; - pwc2.key = 0; - pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); - pwc2.scalaBigInt = BigInt.int2bigInt(31104000); - pwc2.bigDecimalKeepItNull = null; - - GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3); - pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976 - - - data.add(pwc1); - data.add(pwc2); - - return env.fromCollection(data); - } - -} -
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java deleted file mode 100644 index 04a7bc5..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java +++ /dev/null @@ -1,730 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.javaApiOperators.util; - -import java.io.File; -import java.io.Serializable; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.StringValue; -import org.apache.hadoop.io.IntWritable; - -import scala.math.BigInt; - -/** - * ####################################################################################################### - * - * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. - * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING! - * - * ####################################################################################################### - */ -public class ValueCollectionDataSets { - - public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env) { - List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>(); - - data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi"))); - data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello"))); - data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world"))); - data.add(new Tuple3<>(new IntValue(4), new LongValue(3l), new StringValue("Hello world, how are you?"))); - data.add(new Tuple3<>(new IntValue(5), new LongValue(3l), new StringValue("I am fine."))); - data.add(new Tuple3<>(new IntValue(6), new LongValue(3l), new StringValue("Luke Skywalker"))); - data.add(new Tuple3<>(new IntValue(7), new LongValue(4l), new StringValue("Comment#1"))); - data.add(new Tuple3<>(new IntValue(8), new LongValue(4l), new StringValue("Comment#2"))); - data.add(new Tuple3<>(new IntValue(9), new LongValue(4l), new StringValue("Comment#3"))); - data.add(new Tuple3<>(new IntValue(10), new LongValue(4l), new StringValue("Comment#4"))); - data.add(new Tuple3<>(new IntValue(11), new LongValue(5l), new StringValue("Comment#5"))); - data.add(new Tuple3<>(new IntValue(12), new LongValue(5l), new StringValue("Comment#6"))); - data.add(new Tuple3<>(new IntValue(13), new LongValue(5l), new StringValue("Comment#7"))); - data.add(new Tuple3<>(new IntValue(14), new LongValue(5l), new StringValue("Comment#8"))); - data.add(new Tuple3<>(new IntValue(15), new LongValue(5l), new StringValue("Comment#9"))); - data.add(new Tuple3<>(new IntValue(16), new LongValue(6l), new StringValue("Comment#10"))); - data.add(new Tuple3<>(new IntValue(17), new LongValue(6l), new StringValue("Comment#11"))); - data.add(new Tuple3<>(new IntValue(18), new LongValue(6l), new StringValue("Comment#12"))); - data.add(new Tuple3<>(new IntValue(19), new LongValue(6l), new StringValue("Comment#13"))); - data.add(new Tuple3<>(new IntValue(20), new LongValue(6l), new StringValue("Comment#14"))); - data.add(new Tuple3<>(new IntValue(21), new LongValue(6l), new StringValue("Comment#15"))); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env) { - List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>(); - - data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi"))); - data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello"))); - data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world"))); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env) { - List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>(); - - data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l))); - data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l))); - data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l))); - data.add(new Tuple5<>(new IntValue(3), new LongValue(4l), new IntValue(3), new StringValue("Hallo Welt wie gehts?"), new LongValue(2l))); - data.add(new Tuple5<>(new IntValue(3), new LongValue(5l), new IntValue(4), new StringValue("ABC"), new LongValue(2l))); - data.add(new Tuple5<>(new IntValue(3), new LongValue(6l), new IntValue(5), new StringValue("BCD"), new LongValue(3l))); - data.add(new Tuple5<>(new IntValue(4), new LongValue(7l), new IntValue(6), new StringValue("CDE"), new LongValue(2l))); - data.add(new Tuple5<>(new IntValue(4), new LongValue(8l), new IntValue(7), new StringValue("DEF"), new LongValue(1l))); - data.add(new Tuple5<>(new IntValue(4), new LongValue(9l), new IntValue(8), new StringValue("EFG"), new LongValue(1l))); - data.add(new Tuple5<>(new IntValue(4), new LongValue(10l), new IntValue(9), new StringValue("FGH"), new LongValue(2l))); - data.add(new Tuple5<>(new IntValue(5), new LongValue(11l), new IntValue(10), new StringValue("GHI"), new LongValue(1l))); - data.add(new Tuple5<>(new IntValue(5), new LongValue(12l), new IntValue(11), new StringValue("HIJ"), new LongValue(3l))); - data.add(new Tuple5<>(new IntValue(5), new LongValue(13l), new IntValue(12), new StringValue("IJK"), new LongValue(3l))); - data.add(new Tuple5<>(new IntValue(5), new LongValue(14l), new IntValue(13), new StringValue("JKL"), new LongValue(2l))); - data.add(new Tuple5<>(new IntValue(5), new LongValue(15l), new IntValue(14), new StringValue("KLM"), new LongValue(2l))); - - Collections.shuffle(data); - - TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new - TupleTypeInfo<>( - ValueTypeInfo.INT_VALUE_TYPE_INFO, - ValueTypeInfo.LONG_VALUE_TYPE_INFO, - ValueTypeInfo.INT_VALUE_TYPE_INFO, - ValueTypeInfo.STRING_VALUE_TYPE_INFO, - ValueTypeInfo.LONG_VALUE_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> getSmall5TupleDataSet(ExecutionEnvironment env) { - List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>(); - - data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l))); - data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l))); - data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l))); - - Collections.shuffle(data); - - TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new - TupleTypeInfo<>( - ValueTypeInfo.INT_VALUE_TYPE_INFO, - ValueTypeInfo.LONG_VALUE_TYPE_INFO, - ValueTypeInfo.INT_VALUE_TYPE_INFO, - ValueTypeInfo.STRING_VALUE_TYPE_INFO, - ValueTypeInfo.LONG_VALUE_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getSmallNestedTupleDataSet(ExecutionEnvironment env) { - List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>(); - - data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(1)), new StringValue("one"))); - data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("two"))); - data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("three"))); - - TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new - TupleTypeInfo<>( - new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO), - ValueTypeInfo.STRING_VALUE_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) { - List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>(); - - data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(3)), new StringValue("a"))); - data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(2)), new StringValue("a"))); - data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(1)), new StringValue("a"))); - data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("b"))); - data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("c"))); - data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(6)), new StringValue("c"))); - data.add(new Tuple2<>(new Tuple2<>(new IntValue(4), new IntValue(9)), new StringValue("c"))); - - TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new - TupleTypeInfo<>( - new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO), - ValueTypeInfo.STRING_VALUE_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) { - List<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> data = new ArrayList<>(); - - data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(3)), new StringValue("a"), new IntValue(2))); - data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(2)), new StringValue("a"), new IntValue(1))); - data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(1)), new StringValue("a"), new IntValue(3))); - data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(2)), new StringValue("b"), new IntValue(4))); - data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(3)), new StringValue("c"), new IntValue(5))); - data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(6)), new StringValue("c"), new IntValue(6))); - data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(4), new IntValue(9)), new StringValue("c"), new IntValue(7))); - - TupleTypeInfo<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> type = new - TupleTypeInfo<>( - new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO), - ValueTypeInfo.STRING_VALUE_TYPE_INFO, - ValueTypeInfo.INT_VALUE_TYPE_INFO - ); - - return env.fromCollection(data, type); - } - - public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env) { - List<StringValue> data = new ArrayList<>(); - - data.add(new StringValue("Hi")); - data.add(new StringValue("Hello")); - data.add(new StringValue("Hello world")); - data.add(new StringValue("Hello world, how are you?")); - data.add(new StringValue("I am fine.")); - data.add(new StringValue("Luke Skywalker")); - data.add(new StringValue("Random comment")); - data.add(new StringValue("LOL")); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<IntValue> getIntDataSet(ExecutionEnvironment env) { - List<IntValue> data = new ArrayList<>(); - - data.add(new IntValue(1)); - data.add(new IntValue(2)); - data.add(new IntValue(2)); - data.add(new IntValue(3)); - data.add(new IntValue(3)); - data.add(new IntValue(3)); - data.add(new IntValue(4)); - data.add(new IntValue(4)); - data.add(new IntValue(4)); - data.add(new IntValue(4)); - data.add(new IntValue(5)); - data.add(new IntValue(5)); - data.add(new IntValue(5)); - data.add(new IntValue(5)); - data.add(new IntValue(5)); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) { - List<CustomType> data = new ArrayList<CustomType>(); - - data.add(new CustomType(1, 0l, "Hi")); - data.add(new CustomType(2, 1l, "Hello")); - data.add(new CustomType(2, 2l, "Hello world")); - data.add(new CustomType(3, 3l, "Hello world, how are you?")); - data.add(new CustomType(3, 4l, "I am fine.")); - data.add(new CustomType(3, 5l, "Luke Skywalker")); - data.add(new CustomType(4, 6l, "Comment#1")); - data.add(new CustomType(4, 7l, "Comment#2")); - data.add(new CustomType(4, 8l, "Comment#3")); - data.add(new CustomType(4, 9l, "Comment#4")); - data.add(new CustomType(5, 10l, "Comment#5")); - data.add(new CustomType(5, 11l, "Comment#6")); - data.add(new CustomType(5, 12l, "Comment#7")); - data.add(new CustomType(5, 13l, "Comment#8")); - data.add(new CustomType(5, 14l, "Comment#9")); - data.add(new CustomType(6, 15l, "Comment#10")); - data.add(new CustomType(6, 16l, "Comment#11")); - data.add(new CustomType(6, 17l, "Comment#12")); - data.add(new CustomType(6, 18l, "Comment#13")); - data.add(new CustomType(6, 19l, "Comment#14")); - data.add(new CustomType(6, 20l, "Comment#15")); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) { - List<CustomType> data = new ArrayList<CustomType>(); - - data.add(new CustomType(1, 0l, "Hi")); - data.add(new CustomType(2, 1l, "Hello")); - data.add(new CustomType(2, 2l, "Hello world")); - - Collections.shuffle(data); - - return env.fromCollection(data); - } - - public static class CustomType implements Serializable { - - private static final long serialVersionUID = 1L; - - public IntValue myInt; - public LongValue myLong; - public StringValue myString; - - public CustomType() { - } - - public CustomType(int i, long l, String s) { - myInt = new IntValue(i); - myLong = new LongValue(l); - myString = new StringValue(s); - } - - @Override - public String toString() { - return myInt + "," + myLong + "," + myString; - } - } - - public static class CustomTypeComparator implements Comparator<CustomType> { - - @Override - public int compare(CustomType o1, CustomType o2) { - int diff = o1.myInt.getValue() - o2.myInt.getValue(); - if (diff != 0) { - return diff; - } - diff = (int) (o1.myLong.getValue() - o2.myLong.getValue()); - return diff != 0 ? diff : o1.myString.getValue().compareTo(o2.myString.getValue()); - } - - } - - public static DataSet<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> getSmallTuplebasedDataSet(ExecutionEnvironment env) { - List<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> data = new ArrayList<>(); - - data.add(new Tuple7<>(new IntValue(1), new StringValue("First"), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new LongValue(10000L))); - data.add(new Tuple7<>(new IntValue(2), new StringValue("Second"), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new LongValue(20000L))); - data.add(new Tuple7<>(new IntValue(3), new StringValue("Third"), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new LongValue(30000L))); - - return env.fromCollection(data); - } - - public static DataSet<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) { - List<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> data = new ArrayList<>(); - - data.add(new Tuple7<>(new LongValue(10000L), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new IntValue(1), new StringValue("First"))); - data.add(new Tuple7<>(new LongValue(20000L), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new IntValue(2), new StringValue("Second"))); - data.add(new Tuple7<>(new LongValue(30000L), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new IntValue(3), new StringValue("Third"))); - - return env.fromCollection(data); - } - - public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) { - List<POJO> data = new ArrayList<POJO>(); - - data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/)); - data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)); - data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); - - return env.fromCollection(data); - } - - public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) { - List<POJO> data = new ArrayList<POJO>(); - - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); - data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L)); - data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x - data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); - - return env.fromCollection(data); - } - - public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) { - List<POJO> data = new ArrayList<POJO>(); - - data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x - data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L)); - data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L)); - data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L)); - data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L)); - data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L)); - data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x - data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L)); - - return env.fromCollection(data); - } - - public static class POJO { - public IntValue number; - public StringValue str; - public Tuple2<IntValue, CustomType> nestedTupleWithCustom; - public NestedPojo nestedPojo; - public transient LongValue ignoreMe; - - public POJO(int i0, String s0, - int i1, int i2, long l0, String s1, - long l1) { - this.number = new IntValue(i0); - this.str = new StringValue(s0); - this.nestedTupleWithCustom = new Tuple2<>(new IntValue(i1), new CustomType(i2, l0, s1)); - this.nestedPojo = new NestedPojo(); - this.nestedPojo.longNumber = new LongValue(l1); - } - - public POJO() { - } - - @Override - public String toString() { - return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber; - } - } - - public static class NestedPojo { - public static Object ignoreMe; - public LongValue longNumber; - - public NestedPojo() { - } - } - - public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) { - List<CrazyNested> data = new ArrayList<CrazyNested>(); - - data.add(new CrazyNested("aa")); - data.add(new CrazyNested("bb")); - data.add(new CrazyNested("bb")); - data.add(new CrazyNested("cc")); - data.add(new CrazyNested("cc")); - data.add(new CrazyNested("cc")); - - return env.fromCollection(data); - } - - public static class CrazyNested { - public CrazyNestedL1 nest_Lvl1; - public LongValue something; // test proper null-value handling - - public CrazyNested() { - } - - public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values - this(set); - something = new LongValue(s); - nest_Lvl1.a = new StringValue(second); - } - - public CrazyNested(String set) { - nest_Lvl1 = new CrazyNestedL1(); - nest_Lvl1.nest_Lvl2 = new CrazyNestedL2(); - nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3(); - nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4(); - nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = new StringValue(set); - } - } - - public static class CrazyNestedL1 { - public StringValue a; - public IntValue b; - public CrazyNestedL2 nest_Lvl2; - } - - public static class CrazyNestedL2 { - public CrazyNestedL3 nest_Lvl3; - } - - public static class CrazyNestedL3 { - public CrazyNestedL4 nest_Lvl4; - } - - public static class CrazyNestedL4 { - public StringValue f1nal; - } - - // Copied from TypeExtractorTest - public static class FromTuple extends Tuple3<StringValue, StringValue, LongValue> { - private static final long serialVersionUID = 1L; - public IntValue special; - } - - public static class FromTupleWithCTor extends FromTuple { - - private static final long serialVersionUID = 1L; - - public FromTupleWithCTor() {} - - public FromTupleWithCTor(int special, long tupleField) { - this.special = new IntValue(special); - this.setField(new LongValue(tupleField), 2); - } - } - - public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) { - List<FromTupleWithCTor> data = new ArrayList<>(); - data.add(new FromTupleWithCTor(1, 10L)); // 3x - data.add(new FromTupleWithCTor(1, 10L)); - data.add(new FromTupleWithCTor(1, 10L)); - data.add(new FromTupleWithCTor(2, 20L)); // 2x - data.add(new FromTupleWithCTor(2, 20L)); - return env.fromCollection(data); - } - - public static class PojoContainingTupleAndWritable { - public IntValue someInt; - public StringValue someString; - public IntWritable hadoopFan; - public Tuple2<LongValue, LongValue> theTuple; - - public PojoContainingTupleAndWritable() { - } - - public PojoContainingTupleAndWritable(int i, long l1, long l2) { - hadoopFan = new IntWritable(i); - someInt = new IntValue(i); - theTuple = new Tuple2<>(new LongValue(l1), new LongValue(l2)); - } - } - - public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) { - List<PojoContainingTupleAndWritable> data = new ArrayList<>(); - data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); - return env.fromCollection(data); - } - - - - public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) { - List<PojoContainingTupleAndWritable> data = new ArrayList<>(); - data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x - data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x - data.add(new PojoContainingTupleAndWritable(2, 20L, 201L)); - data.add(new PojoContainingTupleAndWritable(2, 30L, 200L)); - data.add(new PojoContainingTupleAndWritable(2, 30L, 600L)); - data.add(new PojoContainingTupleAndWritable(2, 30L, 400L)); - return env.fromCollection(data); - } - - public static DataSet<Tuple3<IntValue, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) { - List<Tuple3<IntValue, CrazyNested, POJO>> data = new ArrayList<>(); - data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x - data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); - data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); - // POJO is not initialized according to the first two fields. - data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(2), new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x - return env.fromCollection(data); - } - - public static class Pojo1 { - public StringValue a; - public StringValue b; - - public Pojo1() {} - - public Pojo1(String a, String b) { - this.a = new StringValue(a); - this.b = new StringValue(b); - } - } - - public static class Pojo2 { - public StringValue a2; - public StringValue b2; - } - - public static class PojoWithMultiplePojos { - public Pojo1 p1; - public Pojo2 p2; - public IntValue i0; - - public PojoWithMultiplePojos() { - } - - public PojoWithMultiplePojos(String a, String b, String a1, String b1, int i0) { - p1 = new Pojo1(); - p1.a = new StringValue(a); - p1.b = new StringValue(b); - p2 = new Pojo2(); - p2.a2 = new StringValue(a1); - p2.b2 = new StringValue(b1); - this.i0 = new IntValue(i0); - } - } - - public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) { - List<PojoWithMultiplePojos> data = new ArrayList<>(); - data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1)); - data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); - data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); - data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2)); - data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3)); - data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3)); - return env.fromCollection(data); - } - - public enum Category { - CAT_A, CAT_B; - } - - public static class PojoWithDateAndEnum { - public StringValue group; - public Date date; - public Category cat; - } - - public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) { - List<PojoWithDateAndEnum> data = new ArrayList<PojoWithDateAndEnum>(); - - PojoWithDateAndEnum one = new PojoWithDateAndEnum(); - one.group = new StringValue("a"); - one.date = new Date(666); - one.cat = Category.CAT_A; - data.add(one); - - PojoWithDateAndEnum two = new PojoWithDateAndEnum(); - two.group = new StringValue("a"); - two.date = new Date(666); - two.cat = Category.CAT_A; - data.add(two); - - PojoWithDateAndEnum three = new PojoWithDateAndEnum(); - three.group = new StringValue("b"); - three.date = new Date(666); - three.cat = Category.CAT_B; - data.add(three); - - return env.fromCollection(data); - } - - public static class PojoWithCollection { - public List<Pojo1> pojos; - public IntValue key; - public java.sql.Date sqlDate; - public BigInteger bigInt; - public BigDecimal bigDecimalKeepItNull; - public BigInt scalaBigInt; - public List<Object> mixed; - - @Override - public String toString() { - return "PojoWithCollection{" + - "pojos.size()=" + pojos.size() + - ", key=" + key + - ", sqlDate=" + sqlDate + - ", bigInt=" + bigInt + - ", bigDecimalKeepItNull=" + bigDecimalKeepItNull + - ", scalaBigInt=" + scalaBigInt + - ", mixed=" + mixed + - '}'; - } - } - - public static class PojoWithCollectionGeneric { - public List<Pojo1> pojos; - public IntValue key; - public java.sql.Date sqlDate; - public BigInteger bigInt; - public BigDecimal bigDecimalKeepItNull; - public BigInt scalaBigInt; - public List<Object> mixed; - private PojoWithDateAndEnum makeMeGeneric; - - @Override - public String toString() { - return "PojoWithCollection{" + - "pojos.size()=" + pojos.size() + - ", key=" + key + - ", sqlDate=" + sqlDate + - ", bigInt=" + bigInt + - ", bigDecimalKeepItNull=" + bigDecimalKeepItNull + - ", scalaBigInt=" + scalaBigInt + - ", mixed=" + mixed + - '}'; - } - } - - public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) { - List<PojoWithCollection> data = new ArrayList<>(); - - List<Pojo1> pojosList1 = new ArrayList<>(); - pojosList1.add(new Pojo1("a", "aa")); - pojosList1.add(new Pojo1("b", "bb")); - - List<Pojo1> pojosList2 = new ArrayList<>(); - pojosList2.add(new Pojo1("a2", "aa2")); - pojosList2.add(new Pojo1("b2", "bb2")); - - PojoWithCollection pwc1 = new PojoWithCollection(); - pwc1.pojos = pojosList1; - pwc1.key = new IntValue(0); - pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); - pwc1.scalaBigInt = BigInt.int2bigInt(10); - pwc1.bigDecimalKeepItNull = null; - - // use calendar to make it stable across time zones - GregorianCalendar gcl1 = new GregorianCalendar(2033, 04, 18); - pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis()); - pwc1.mixed = new ArrayList<Object>(); - Map<StringValue, IntValue> map = new HashMap<>(); - map.put(new StringValue("someKey"), new IntValue(1)); - pwc1.mixed.add(map); - pwc1.mixed.add(new File("/this/is/wrong")); - pwc1.mixed.add("uhlala"); - - PojoWithCollection pwc2 = new PojoWithCollection(); - pwc2.pojos = pojosList2; - pwc2.key = new IntValue(0); - pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); - pwc2.scalaBigInt = BigInt.int2bigInt(31104000); - pwc2.bigDecimalKeepItNull = null; - - GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3); - pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976 - - data.add(pwc1); - data.add(pwc2); - - return env.fromCollection(data); - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java index aabe7c0..6413a3b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java @@ -15,13 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.test.manual; import org.apache.flink.types.parser.FieldParserTest; import org.junit.BeforeClass; import org.junit.Test; - import org.reflections.Reflections; import org.reflections.scanners.MemberUsageScanner; import org.reflections.util.ClasspathHelper; @@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals; /** * Tests via reflection that certain methods are not called in Flink. - * + * * <p>Forbidden calls include: * - Byte / String conversions that do not specify an explicit charset * because they produce different results in different locales @@ -116,11 +116,10 @@ public class CheckForbiddenMethodsUsage { .addUrls(ClasspathHelper.forPackage("org.apache.flink")) .addScanners(new MemberUsageScanner())); - for (ForbiddenCall forbiddenCall : forbiddenCalls) { final Set<Member> methodUsages = forbiddenCall.getUsages(reflections); methodUsages.removeAll(forbiddenCall.getExclusions()); - assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size()); + assertEquals("Unexpected calls: " + methodUsages, 0, methodUsages.size()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java index 0692196..f02cf1c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java @@ -40,23 +40,26 @@ import java.util.List; import static org.junit.Assert.fail; +/** + * Manual test for growing hash tables. + */ public class HashTableRecordWidthCombinations { public static void main(String[] args) throws Exception { @SuppressWarnings("unchecked") - final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer = + final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer = new TupleSerializer<Tuple2<Long, byte[]>>( (Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class, new TypeSerializer<?>[] { LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE }); - + final TypeSerializer<Long> probeSerializer = LongSerializer.INSTANCE; final TypeComparator<Tuple2<Long, byte[]>> buildComparator = new TupleComparator<Tuple2<Long, byte[]>>( new int[] {0}, new TypeComparator<?>[] { new LongComparator(true) }, new TypeSerializer<?>[] { LongSerializer.INSTANCE }); - + final TypeComparator<Long> probeComparator = new LongComparator(true); final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() { @@ -85,7 +88,7 @@ public class HashTableRecordWidthCombinations { final IOManager ioMan = new IOManagerAsync(); try { - final int pageSize = 32*1024; + final int pageSize = 32 * 1024; final int numSegments = 34; for (int num = 3400; num < 3550; num++) { @@ -151,7 +154,7 @@ public class HashTableRecordWidthCombinations { try { while (table.nextRecord()) { MutableObjectIterator<Tuple2<Long, byte[]>> matches = table.getBuildSideIterator(); - while (matches.next() != null); + while (matches.next() != null) {} } } catch (RuntimeException e) { @@ -176,11 +179,11 @@ public class HashTableRecordWidthCombinations { ioMan.shutdown(); } } - + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - + private static List<MemorySegment> getMemory(int numSegments, int segmentSize) { ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments); for (int i = 0; i < numSegments; i++) { @@ -188,7 +191,7 @@ public class HashTableRecordWidthCombinations { } return list; } - + private static void checkNoTempFilesRemain(IOManager ioManager) { for (File dir : ioManager.getSpillingDirectories()) { for (String file : dir.list()) { http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java index 9821b05..c69e6fd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java @@ -18,14 +18,6 @@ package org.apache.flink.test.manual; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.util.Random; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -41,13 +33,24 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.util.MutableObjectIterator; + import org.junit.Assert; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Random; + +/** + * Test {@link UnilateralSortMerger} on a large set of {@code String}. + */ public class MassiveStringSorting { private static final long SEED = 347569784659278346L; - - + public void testStringSorting() { File input = null; File sorted = null; @@ -55,12 +58,12 @@ public class MassiveStringSorting { try { // the source file input = generateFileWithStrings(300000, "http://some-uri.com/that/is/a/common/prefix/to/all"); - + // the sorted file sorted = File.createTempFile("sorted_strings", "txt"); - - String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""}; - + + String[] command = {"/bin/bash", "-c", "export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""}; + Process p = null; try { p = Runtime.getRuntime().exec(command); @@ -74,37 +77,37 @@ public class MassiveStringSorting { p.destroy(); } } - + // sort the data UnilateralSortMerger<String> sorter = null; BufferedReader reader = null; BufferedReader verifyReader = null; - + try { MemoryManager mm = new MemoryManager(1024 * 1024, 1); IOManager ioMan = new IOManagerAsync(); - + TypeSerializer<String> serializer = StringSerializer.INSTANCE; TypeComparator<String> comparator = new StringComparator(true); - + reader = new BufferedReader(new FileReader(input)); MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader); - + sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(), new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f, true /* use large record handler */, false); MutableObjectIterator<String> sortedData = sorter.getIterator(); - + reader.close(); - + // verify verifyReader = new BufferedReader(new FileReader(sorted)); String next; - + while ((next = verifyReader.readLine()) != null) { String nextFromStratoSort = sortedData.next(""); - + Assert.assertNotNull(nextFromStratoSort); Assert.assertEquals(next, nextFromStratoSort); } @@ -135,23 +138,23 @@ public class MassiveStringSorting { } } } - + @SuppressWarnings("unchecked") public void testStringTuplesSorting() { - final int NUM_STRINGS = 300000; - + final int numStrings = 300000; + File input = null; File sorted = null; try { // the source file - input = generateFileWithStringTuples(NUM_STRINGS, "http://some-uri.com/that/is/a/common/prefix/to/all"); - + input = generateFileWithStringTuples(numStrings, "http://some-uri.com/that/is/a/common/prefix/to/all"); + // the sorted file sorted = File.createTempFile("sorted_strings", "txt"); - - String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""}; - + + String[] command = {"/bin/bash", "-c", "export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""}; + Process p = null; try { p = Runtime.getRuntime().exec(command); @@ -165,33 +168,31 @@ public class MassiveStringSorting { p.destroy(); } } - + // sort the data UnilateralSortMerger<Tuple2<String, String[]>> sorter = null; BufferedReader reader = null; BufferedReader verifyReader = null; - + try { MemoryManager mm = new MemoryManager(1024 * 1024, 1); IOManager ioMan = new IOManagerAsync(); - - TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) + + TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) TypeInfoParser.<Tuple2<String, String[]>>parse("Tuple2<String, String[]>"); TypeSerializer<Tuple2<String, String[]>> serializer = typeInfo.createSerializer(new ExecutionConfig()); TypeComparator<Tuple2<String, String[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0, new ExecutionConfig()); - + reader = new BufferedReader(new FileReader(input)); MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader); - + sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(), new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f, true /* use large record handler */, false); - - // use this part to verify that all if good when sorting in memory - + // List<MemorySegment> memory = mm.allocatePages(new DummyInvokable(), mm.computeNumberOfPages(1024*1024*1024)); // NormalizedKeySorter<Tuple2<String, String[]>> nks = new NormalizedKeySorter<Tuple2<String,String[]>>(serializer, comparator, memory); // @@ -200,36 +201,36 @@ public class MassiveStringSorting { // while ((wi = inputIterator.next(wi)) != null) { // Assert.assertTrue(nks.write(wi)); // } -// +// // new QuickSort().sort(nks); // } -// +// // MutableObjectIterator<Tuple2<String, String[]>> sortedData = nks.getIterator(); - + MutableObjectIterator<Tuple2<String, String[]>> sortedData = sorter.getIterator(); reader.close(); - + // verify verifyReader = new BufferedReader(new FileReader(sorted)); MutableObjectIterator<Tuple2<String, String[]>> verifyIterator = new StringTupleReaderMutableObjectIterator(verifyReader); - + Tuple2<String, String[]> next = new Tuple2<String, String[]>("", new String[0]); Tuple2<String, String[]> nextFromStratoSort = new Tuple2<String, String[]>("", new String[0]); - + int num = 0; - + while ((next = verifyIterator.next(next)) != null) { num++; - + nextFromStratoSort = sortedData.next(nextFromStratoSort); Assert.assertNotNull(nextFromStratoSort); - + Assert.assertEquals(next.f0, nextFromStratoSort.f0); Assert.assertArrayEquals(next.f1, nextFromStratoSort.f1); } - + Assert.assertNull(sortedData.next(nextFromStratoSort)); - Assert.assertEquals(NUM_STRINGS, num); + Assert.assertEquals(numStrings, num); } finally { @@ -260,15 +261,15 @@ public class MassiveStringSorting { } // -------------------------------------------------------------------------------------------- - + private static final class StringReaderMutableObjectIterator implements MutableObjectIterator<String> { - + private final BufferedReader reader; public StringReaderMutableObjectIterator(BufferedReader reader) { this.reader = reader; } - + @Override public String next(String reuse) throws IOException { return reader.readLine(); @@ -279,22 +280,22 @@ public class MassiveStringSorting { return reader.readLine(); } } - + private static final class StringTupleReaderMutableObjectIterator implements MutableObjectIterator<Tuple2<String, String[]>> { - + private final BufferedReader reader; public StringTupleReaderMutableObjectIterator(BufferedReader reader) { this.reader = reader; } - + @Override public Tuple2<String, String[]> next(Tuple2<String, String[]> reuse) throws IOException { String line = reader.readLine(); if (line == null) { return null; } - + String[] parts = line.split(" "); reuse.f0 = parts[0]; reuse.f1 = parts; @@ -306,31 +307,31 @@ public class MassiveStringSorting { return next(new Tuple2<String, String[]>()); } } - + // -------------------------------------------------------------------------------------------- - + private File generateFileWithStrings(int numStrings, String prefix) throws IOException { final Random rnd = new Random(SEED); - + final StringBuilder bld = new StringBuilder(); final int resetValue = prefix.length(); - + bld.append(prefix); - + File f = File.createTempFile("strings", "txt"); BufferedWriter wrt = null; try { wrt = new BufferedWriter(new FileWriter(f)); - - for (int i = 0 ; i < numStrings; i++) { + + for (int i = 0; i < numStrings; i++) { bld.setLength(resetValue); - + int len = rnd.nextInt(20) + 300; for (int k = 0; k < len; k++) { char c = (char) (rnd.nextInt(80) + 40); bld.append(c); } - + String str = bld.toString(); wrt.write(str); wrt.newLine(); @@ -338,52 +339,52 @@ public class MassiveStringSorting { } finally { wrt.close(); } - + return f; } - + private File generateFileWithStringTuples(int numStrings, String prefix) throws IOException { final Random rnd = new Random(SEED); - + final StringBuilder bld = new StringBuilder(); File f = File.createTempFile("strings", "txt"); BufferedWriter wrt = null; try { wrt = new BufferedWriter(new FileWriter(f)); - - for (int i = 0 ; i < numStrings; i++) { + + for (int i = 0; i < numStrings; i++) { bld.setLength(0); - + int numComps = rnd.nextInt(5) + 1; - + for (int z = 0; z < numComps; z++) { if (z > 0) { bld.append(' '); } bld.append(prefix); - + int len = rnd.nextInt(20) + 10; for (int k = 0; k < len; k++) { char c = (char) (rnd.nextInt(80) + 40); bld.append(c); } } - + String str = bld.toString(); - + wrt.write(str); wrt.newLine(); } } finally { wrt.close(); } - + return f; } - + // -------------------------------------------------------------------------------------------- - + public static void main(String[] args) { new MassiveStringSorting().testStringSorting(); new MassiveStringSorting().testStringTuplesSorting(); http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java index 9e37b79..453aa14 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java @@ -18,14 +18,6 @@ package org.apache.flink.test.manual; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.util.Random; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -42,12 +34,24 @@ import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.types.StringValue; import org.apache.flink.util.MutableObjectIterator; + import org.junit.Assert; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Random; + +/** + * Test {@link UnilateralSortMerger} on a large set of {@link StringValue}. + */ public class MassiveStringValueSorting { private static final long SEED = 347569784659278346L; - + public void testStringValueSorting() { File input = null; File sorted = null; @@ -55,12 +59,12 @@ public class MassiveStringValueSorting { try { // the source file input = generateFileWithStrings(300000, "http://some-uri.com/that/is/a/common/prefix/to/all"); - + // the sorted file sorted = File.createTempFile("sorted_strings", "txt"); - - String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""}; - + + String[] command = {"/bin/bash", "-c", "export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""}; + Process p = null; try { p = Runtime.getRuntime().exec(command); @@ -74,38 +78,38 @@ public class MassiveStringValueSorting { p.destroy(); } } - + // sort the data UnilateralSortMerger<StringValue> sorter = null; BufferedReader reader = null; BufferedReader verifyReader = null; - + try { MemoryManager mm = new MemoryManager(1024 * 1024, 1); IOManager ioMan = new IOManagerAsync(); - + TypeSerializer<StringValue> serializer = new CopyableValueSerializer<StringValue>(StringValue.class); TypeComparator<StringValue> comparator = new CopyableValueComparator<StringValue>(true, StringValue.class); - + reader = new BufferedReader(new FileReader(input)); MutableObjectIterator<StringValue> inputIterator = new StringValueReaderMutableObjectIterator(reader); - + sorter = new UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new DummyInvokable(), new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f, true /* use large record handler */, true); MutableObjectIterator<StringValue> sortedData = sorter.getIterator(); - + reader.close(); - + // verify verifyReader = new BufferedReader(new FileReader(sorted)); String nextVerify; StringValue nextFromFlinkSort = new StringValue(); - + while ((nextVerify = verifyReader.readLine()) != null) { nextFromFlinkSort = sortedData.next(nextFromFlinkSort); - + Assert.assertNotNull(nextFromFlinkSort); Assert.assertEquals(nextVerify, nextFromFlinkSort.getValue()); } @@ -138,23 +142,23 @@ public class MassiveStringValueSorting { } } } - + @SuppressWarnings("unchecked") public void testStringValueTuplesSorting() { - final int NUM_STRINGS = 300000; - + final int numStrings = 300000; + File input = null; File sorted = null; try { // the source file - input = generateFileWithStringTuples(NUM_STRINGS, "http://some-uri.com/that/is/a/common/prefix/to/all"); - + input = generateFileWithStringTuples(numStrings, "http://some-uri.com/that/is/a/common/prefix/to/all"); + // the sorted file sorted = File.createTempFile("sorted_strings", "txt"); - - String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""}; - + + String[] command = {"/bin/bash", "-c", "export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""}; + Process p = null; try { p = Runtime.getRuntime().exec(command); @@ -168,33 +172,31 @@ public class MassiveStringValueSorting { p.destroy(); } } - + // sort the data UnilateralSortMerger<Tuple2<StringValue, StringValue[]>> sorter = null; BufferedReader reader = null; BufferedReader verifyReader = null; - + try { MemoryManager mm = new MemoryManager(1024 * 1024, 1); IOManager ioMan = new IOManagerAsync(); - + TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>) TypeInfoParser.<Tuple2<StringValue, StringValue[]>>parse("Tuple2<org.apache.flink.types.StringValue, org.apache.flink.types.StringValue[]>"); TypeSerializer<Tuple2<StringValue, StringValue[]>> serializer = typeInfo.createSerializer(new ExecutionConfig()); TypeComparator<Tuple2<StringValue, StringValue[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0, new ExecutionConfig()); - + reader = new BufferedReader(new FileReader(input)); MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader); - + sorter = new UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, inputIterator, new DummyInvokable(), new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f, true /* use large record handler */, false); - - // use this part to verify that all if good when sorting in memory - + // List<MemorySegment> memory = mm.allocatePages(new DummyInvokable(), mm.computeNumberOfPages(1024*1024*1024)); // NormalizedKeySorter<Tuple2<String, String[]>> nks = new NormalizedKeySorter<Tuple2<String,String[]>>(serializer, comparator, memory); // @@ -203,36 +205,36 @@ public class MassiveStringValueSorting { // while ((wi = inputIterator.next(wi)) != null) { // Assert.assertTrue(nks.write(wi)); // } -// +// // new QuickSort().sort(nks); // } -// +// // MutableObjectIterator<Tuple2<String, String[]>> sortedData = nks.getIterator(); - + MutableObjectIterator<Tuple2<StringValue, StringValue[]>> sortedData = sorter.getIterator(); reader.close(); - + // verify verifyReader = new BufferedReader(new FileReader(sorted)); MutableObjectIterator<Tuple2<StringValue, StringValue[]>> verifyIterator = new StringValueTupleReaderMutableObjectIterator(verifyReader); - + Tuple2<StringValue, StringValue[]> nextVerify = new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]); Tuple2<StringValue, StringValue[]> nextFromFlinkSort = new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]); - + int num = 0; - + while ((nextVerify = verifyIterator.next(nextVerify)) != null) { num++; - + nextFromFlinkSort = sortedData.next(nextFromFlinkSort); Assert.assertNotNull(nextFromFlinkSort); - + Assert.assertEquals(nextVerify.f0, nextFromFlinkSort.f0); Assert.assertArrayEquals(nextVerify.f1, nextFromFlinkSort.f1); } - + Assert.assertNull(sortedData.next(nextFromFlinkSort)); - Assert.assertEquals(NUM_STRINGS, num); + Assert.assertEquals(numStrings, num); } finally { @@ -265,23 +267,23 @@ public class MassiveStringValueSorting { } // -------------------------------------------------------------------------------------------- - + private static final class StringValueReaderMutableObjectIterator implements MutableObjectIterator<StringValue> { - + private final BufferedReader reader; public StringValueReaderMutableObjectIterator(BufferedReader reader) { this.reader = reader; } - + @Override public StringValue next(StringValue reuse) throws IOException { String line = reader.readLine(); - + if (line == null) { return null; } - + reuse.setValue(line); return reuse; } @@ -291,30 +293,30 @@ public class MassiveStringValueSorting { return next(new StringValue()); } } - + private static final class StringValueTupleReaderMutableObjectIterator implements MutableObjectIterator<Tuple2<StringValue, StringValue[]>> { - + private final BufferedReader reader; public StringValueTupleReaderMutableObjectIterator(BufferedReader reader) { this.reader = reader; } - + @Override public Tuple2<StringValue, StringValue[]> next(Tuple2<StringValue, StringValue[]> reuse) throws IOException { String line = reader.readLine(); if (line == null) { return null; } - + String[] parts = line.split(" "); reuse.f0.setValue(parts[0]); reuse.f1 = new StringValue[parts.length]; - + for (int i = 0; i < parts.length; i++) { reuse.f1[i] = new StringValue(parts[i]); } - + return reuse; } @@ -323,31 +325,31 @@ public class MassiveStringValueSorting { return next(new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0])); } } - + // -------------------------------------------------------------------------------------------- - + private File generateFileWithStrings(int numStrings, String prefix) throws IOException { final Random rnd = new Random(SEED); - + final StringBuilder bld = new StringBuilder(); final int resetValue = prefix.length(); - + bld.append(prefix); - + File f = File.createTempFile("strings", "txt"); BufferedWriter wrt = null; try { wrt = new BufferedWriter(new FileWriter(f)); - - for (int i = 0 ; i < numStrings; i++) { + + for (int i = 0; i < numStrings; i++) { bld.setLength(resetValue); - + int len = rnd.nextInt(20) + 300; for (int k = 0; k < len; k++) { char c = (char) (rnd.nextInt(80) + 40); bld.append(c); } - + String str = bld.toString(); wrt.write(str); wrt.newLine(); @@ -357,40 +359,40 @@ public class MassiveStringValueSorting { wrt.close(); } } - + return f; } - + private File generateFileWithStringTuples(int numStrings, String prefix) throws IOException { final Random rnd = new Random(SEED); - + final StringBuilder bld = new StringBuilder(); File f = File.createTempFile("strings", "txt"); BufferedWriter wrt = null; try { wrt = new BufferedWriter(new FileWriter(f)); - - for (int i = 0 ; i < numStrings; i++) { + + for (int i = 0; i < numStrings; i++) { bld.setLength(0); - + int numComps = rnd.nextInt(5) + 1; - + for (int z = 0; z < numComps; z++) { if (z > 0) { bld.append(' '); } bld.append(prefix); - + int len = rnd.nextInt(20) + 10; for (int k = 0; k < len; k++) { char c = (char) (rnd.nextInt(80) + 40); bld.append(c); } } - + String str = bld.toString(); - + wrt.write(str); wrt.newLine(); } @@ -399,12 +401,12 @@ public class MassiveStringValueSorting { wrt.close(); } } - + return f; } - + // -------------------------------------------------------------------------------------------- - + public static void main(String[] args) { new MassiveStringValueSorting().testStringValueSorting(); new MassiveStringValueSorting().testStringValueTuplesSorting(); http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java index ee3b4b2..0b8fd1c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java @@ -38,14 +38,14 @@ import static org.junit.Assert.fail; * with a parallelism of 100. */ public class NotSoMiniClusterIterations { - + private static final int PARALLELISM = 100; - + public static void main(String[] args) { if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) { throw new RuntimeException("This test program needs to run with at least 5GB of heap space."); } - + LocalFlinkMiniCluster cluster = null; try { @@ -55,7 +55,7 @@ public class NotSoMiniClusterIterations { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024); - + config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1);