http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java deleted file mode 100644 index 0a5732c..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -// -//package org.apache.flink.test.exampleJavaPrograms; -// -////import org.apache.flink.examples.java.wordcount.WordCountPOJO; -//import org.apache.flink.test.testdata.WordCountData; -//import org.apache.flink.test.util.JavaProgramTestBase; -//import org.junit.Ignore; -// -//@Ignore -//public class WordCountPOJOITCase extends JavaProgramTestBase { -// -// protected String textPath; -// protected String resultPath; -// -// -// @Override -// protected void preSubmit() throws Exception { -// textPath = createTempFile("text.txt", WordCountData.TEXT); -// resultPath = getTempDirPath("result"); -// } -// -// @Override -// protected void postSubmit() throws Exception { -// compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath); -// } -// -// @Override -// protected void testProgram() throws Exception { -// WordCountPOJO.main(new String[]{textPath, resultPath}); -// } -//}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSimplePOJOITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSimplePOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSimplePOJOITCase.java new file mode 100644 index 0000000..7d20597 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSimplePOJOITCase.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.exampleJavaPrograms; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + + +public class WordCountSimplePOJOITCase extends JavaProgramTestBase implements Serializable { + private static final long serialVersionUID = 1L; + protected String textPath; + protected String resultPath; + + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<String> text = env.readTextFile(textPath); + + DataSet<WC> counts = text + .flatMap(new Tokenizer()) + .groupBy("word") + .reduce(new ReduceFunction<WC>() { + private static final long serialVersionUID = 1L; + + public WC reduce(WC value1, WC value2) { + return new WC(value1.word, value1.count + value2.count); + } + }); + + counts.writeAsText(resultPath); + + env.execute("WordCount with custom data types example"); + } + + public static final class Tokenizer implements FlatMapFunction<String, WC> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector<WC> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new WC(token, 1)); + } + } + } + } + + public static class WC { + public WC() {} + public WC(String w, int c) { + word = w; + count = c; + } + public String word; + public int count; + @Override + public String toString() { + return word + " " + count; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java index 9701086..4f00b1d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.test.exampleScalaPrograms; -import org.apache.flink.examples.scala.graph.EnumTrianglesOpt; +import org.apache.flink.examples.java.graph.EnumTrianglesOpt; import org.apache.flink.test.testdata.EnumTriangleData; import org.apache.flink.test.util.JavaProgramTestBase; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java index d14c2e1..42c6a8e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java @@ -18,19 +18,19 @@ package org.apache.flink.test.exampleScalaPrograms; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + import org.apache.flink.configuration.Configuration; -import org.apache.flink.examples.scala.graph.PageRankBasic; +import org.apache.flink.examples.java.graph.PageRankBasic; import org.apache.flink.test.testdata.PageRankData; import org.apache.flink.test.util.JavaProgramTestBase; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - @RunWith(Parameterized.class) public class PageRankITCase extends JavaProgramTestBase { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java index ce31ea2..e607ba7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListComparator.java @@ -32,8 +32,6 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert private long reference; - private Comparable[] extractedKey = new Comparable[1]; - private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)}; @Override @@ -136,13 +134,13 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert } @Override - public Object[] extractKeys(VertexWithAdjacencyList record) { - extractedKey[0] = record.getVertexID(); - return extractedKey; + public int extractKeys(Object record, Object[] target, int index) { + target[index] = ((VertexWithAdjacencyList) record).getVertexID(); + return 1; } @Override - public TypeComparator[] getComparators() { + public TypeComparator[] getFlatComparators() { return comparators; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java index b9503ae..dd9d4dd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingComparator.java @@ -32,8 +32,6 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve private long reference; - private Comparable[] extractedKey = new Comparable[1]; - private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)}; @Override @@ -141,13 +139,13 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve } @Override - public Object[] extractKeys(VertexWithRankAndDangling record) { - extractedKey[0] = record.getVertexID(); - return extractedKey; + public int extractKeys(Object record, Object[] target, int index) { + target[index] = ((VertexWithRankAndDangling) record).getVertexID(); + return 1; } @Override - public TypeComparator[] getComparators() { + public TypeComparator[] getFlatComparators() { return comparators; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java index cfe4a9f..b63f724 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankComparator.java @@ -32,8 +32,6 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan private long reference; - private Comparable[] extractedKey = new Comparable[1]; - private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)}; @Override @@ -139,13 +137,13 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan } @Override - public Object[] extractKeys(VertexWithRank record) { - extractedKey[0] = record.getVertexID(); - return extractedKey; + public int extractKeys(Object record, Object[] target, int index) { + target[index] = ((VertexWithRank) record).getVertexID(); + return 1; } @Override - public TypeComparator[] getComparators() { + public TypeComparator[] getFlatComparators() { return comparators; } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java index ce9aa65..f0e89df 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java @@ -26,18 +26,22 @@ import java.util.LinkedList; import java.util.List; import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.common.functions.RichCoGroupFunction; +import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; +import org.junit.Assert; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -45,7 +49,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class CoGroupITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 9; + private static int NUM_PROGRAMS = 13; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -356,6 +360,149 @@ public class CoGroupITCase extends JavaProgramTestBase { "5,3,HIJ\n" + "5,3,IJK\n"; } + case 10: { + /* + * CoGroup on two custom type inputs using expression keys + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup()); + + coGroupDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "1,0,test\n" + + "2,6,test\n" + + "3,24,test\n" + + "4,60,test\n" + + "5,120,test\n" + + "6,210,test\n"; + } + case 11: { + /* + * CoGroup on two custom type inputs using expression keys + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where("nestedPojo.longNumber").equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for(POJO p : first) { + for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } + } + } + }); + coGroupDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + } + case 12: { + /* + * CoGroup field-selector (expression keys) + key selector function + * The key selector is unnecessary complicated (Tuple1) ;) + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where(new KeySelector<POJO, Tuple1<Long>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple1<Long> getKey(POJO value) + throws Exception { + return new Tuple1<Long>(value.nestedPojo.longNumber); + } + }).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for(POJO p : first) { + for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } + } + } + }); + coGroupDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + } + case 13: { + /* + * CoGroup field-selector (expression keys) + key selector function + * The key selector is simple here + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<CustomType> coGroupDs = ds.coGroup(ds2) + .where(new KeySelector<POJO, Long>() { + private static final long serialVersionUID = 1L; + + @Override + public Long getKey(POJO value) + throws Exception { + return value.nestedPojo.longNumber; + } + }).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() { + private static final long serialVersionUID = 1L; + + @Override + public void coGroup( + Iterable<POJO> first, + Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, + Collector<CustomType> out) throws Exception { + for(POJO p : first) { + for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) { + Assert.assertTrue(p.nestedPojo.longNumber == t.f6); + out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink")); + } + } + } + }); + coGroupDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "-1,20000,Flink\n" + + "-1,10000,Flink\n" + + "-1,30000,Flink\n"; + } + default: throw new IllegalArgumentException("Invalid program id"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 00b68fc..fce56d1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -35,7 +35,12 @@ import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.compiler.PactCompiler; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CrazyNested; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.FromTuple; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.FromTupleWithCTor; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoContainingTupleAndWritable; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; import org.junit.runner.RunWith; @@ -48,7 +53,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; @RunWith(Parameterized.class) public class GroupReduceITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 15; + private static int NUM_PROGRAMS = 19; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -406,7 +411,6 @@ public class GroupReduceITCase extends JavaProgramTestBase { /* * check correctness of groupReduce with descending group sort */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setDegreeOfParallelism(1); @@ -484,6 +488,117 @@ public class GroupReduceITCase extends JavaProgramTestBase { "16,6,Comment#10\n"; } + case 16: { + /* + * Deep nesting test + * + null value in pojo + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<CrazyNested> ds = CollectionDataSets.getCrazyNestedDataSet(env); + DataSet<Tuple2<String, Integer>> reduceDs = ds.groupBy("nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal") + .reduceGroup(new GroupReduceFunction<CollectionDataSets.CrazyNested, Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void reduce(Iterable<CrazyNested> values, + Collector<Tuple2<String, Integer>> out) + throws Exception { + int c = 0; String n = null; + for(CrazyNested v : values) { + c++; // haha + n = v.nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal; + } + out.collect(new Tuple2<String, Integer>(n,c)); + }}); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "aa,1\nbb,2\ncc,3\n"; + } + case 17: { + /* + * Test Pojo extending from tuple WITH custom fields + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<FromTupleWithCTor> ds = CollectionDataSets.getPojoExtendingFromTuple(env); + DataSet<Integer> reduceDs = ds.groupBy("special", "f2") + .reduceGroup(new GroupReduceFunction<FromTupleWithCTor, Integer>() { + private static final long serialVersionUID = 1L; + @Override + public void reduce(Iterable<FromTupleWithCTor> values, + Collector<Integer> out) + throws Exception { + int c = 0; + for(FromTuple v : values) { + c++; + } + out.collect(c); + }}); + + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "3\n2\n"; + } + case 18: { + /* + * Test Pojo containing a Writable and Tuples + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<PojoContainingTupleAndWritable> ds = CollectionDataSets.getPojoContainingTupleAndWritable(env); + DataSet<Integer> reduceDs = ds.groupBy("hadoopFan", "theTuple.*") // full tuple selection + .reduceGroup(new GroupReduceFunction<PojoContainingTupleAndWritable, Integer>() { + private static final long serialVersionUID = 1L; + @Override + public void reduce(Iterable<PojoContainingTupleAndWritable> values, + Collector<Integer> out) + throws Exception { + int c = 0; + for(PojoContainingTupleAndWritable v : values) { + c++; + } + out.collect(c); + }}); + + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "1\n5\n"; + } + case 19: { + /* + * Test Tuple containing pojos and regular fields + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple3<Integer,CrazyNested, POJO>> ds = CollectionDataSets.getTupleContainingPojos(env); + DataSet<Integer> reduceDs = ds.groupBy("f0", "f1.*") // nested full tuple selection + .reduceGroup(new GroupReduceFunction<Tuple3<Integer,CrazyNested, POJO>, Integer>() { + private static final long serialVersionUID = 1L; + @Override + public void reduce(Iterable<Tuple3<Integer,CrazyNested, POJO>> values, + Collector<Integer> out) + throws Exception { + int c = 0; + for(Tuple3<Integer,CrazyNested, POJO> v : values) { + c++; + } + out.collect(c); + }}); + + reduceDs.writeAsText(resultPath); + env.execute(); + + // return expected result + return "3\n1\n"; + } default: { throw new IllegalArgumentException("Invalid program id"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java index 433a076..e8d8be9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java @@ -31,9 +31,11 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; import org.junit.runner.RunWith; @@ -46,7 +48,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; @RunWith(Parameterized.class) public class JoinITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 14; + private static int NUM_PROGRAMS = 21; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -493,8 +495,175 @@ public class JoinITCase extends JavaProgramTestBase { "I am fine.,HIJ\n" + "I am fine.,IJK\n"; } + /** + * Joins with POJOs + */ + case 15: { + /* + * Join nested pojo against tuple (selected using a string) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = + ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6"); + + joinDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; + } + + case 16: { + /* + * Join nested pojo against tuple (selected as an integer) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = + ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference! + + joinDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; + } + case 17: { + /* + * selecting multiple fields using expression language + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = + ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1"); + + joinDs.writeAsCsv(resultPath); + env.setDegreeOfParallelism(1); + env.execute(); + + // return expected result + return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; + + } + case 18: { + /* + * nested into tuple + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = + ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2"); + + joinDs.writeAsCsv(resultPath); + env.setDegreeOfParallelism(1); + env.execute(); + + // return expected result + return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; + + } + case 19: { + /* + * nested into tuple into pojo + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = + ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4"); + + joinDs.writeAsCsv(resultPath); + env.setDegreeOfParallelism(1); + env.execute(); + + // return expected result + return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; + + } + case 20: { + /* + * Non-POJO test to verify that full-tuple keys are working. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env); + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env); + DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs = + ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, Integer> + + joinDs.writeAsCsv(resultPath); + env.setDegreeOfParallelism(1); + env.execute(); + + // return expected result + return "((1,1),one),((1,1),one)\n" + + "((2,2),two),((2,2),two)\n" + + "((3,3),three),((3,3),three)\n"; + + } + case 21: { + /* + * Non-POJO test to verify "nested" tuple-element selection. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env); + DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env); + DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs = + ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2<Integer, Integer> + + joinDs.writeAsCsv(resultPath); + env.setDegreeOfParallelism(1); + env.execute(); + + // return expected result + return "((1,1),one),((1,1),one)\n" + + "((2,2),two),((2,2),two)\n" + + "((3,3),three),((3,3),three)\n"; + + } + case 22: { + /* + * full pojo with full tuple + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env); + DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedPojoMatchingDataSet(env); + DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs = + ds1.join(ds2).where("*").equalTo("*"); + + joinDs.writeAsCsv(resultPath); + env.setDegreeOfParallelism(1); + env.execute(); + + // return expected result + return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" + + "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" + + "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n"; + } default: - throw new IllegalArgumentException("Invalid program id"); + throw new IllegalArgumentException("Invalid program id: "+progId); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java index afefef9..bf1d404 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java @@ -45,7 +45,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class PartitionITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 1; + private static int NUM_PROGRAMS = 3; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -111,7 +111,7 @@ public class PartitionITCase extends JavaProgramTestBase { "5\n" + "6\n"; } - case 2: { + case 1: { /* * Test hash partition by key selector */ @@ -141,7 +141,7 @@ public class PartitionITCase extends JavaProgramTestBase { "5\n" + "6\n"; } - case 1: { + case 2: { /* * Test forced rebalancing */ @@ -200,7 +200,7 @@ public class PartitionITCase extends JavaProgramTestBase { // return expected result return result.toString(); } - case 4: { + case 3: { /* * Test hash partition by key field and different DOP */ http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java index 10ea882..a1957f9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java @@ -42,7 +42,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class ReduceITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 9; + private static int NUM_PROGRAMS = 10; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -305,6 +305,33 @@ public class ReduceITCase extends JavaProgramTestBase { "5,29,0,P-),2\n" + "5,25,0,P-),3\n"; } + case 10: { + /* + * Case 2 with String-based field expression + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env); + DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds. + groupBy("f4","f0").reduce(new Tuple5Reduce()); + + reduceDs.writeAsCsv(resultPath); + env.execute(); + + // return expected result + return "1,1,0,Hallo,1\n" + + "2,3,2,Hallo Welt wie,1\n" + + "2,2,1,Hallo Welt,2\n" + + "3,9,0,P-),2\n" + + "3,6,5,BCD,3\n" + + "4,17,0,P-),1\n" + + "4,17,0,P-),2\n" + + "5,11,10,GHI,1\n" + + "5,29,0,P-),2\n" + + "5,25,0,P-),3\n"; + } + default: throw new IllegalArgumentException("Invalid program id"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index 3ca8f31..b657545 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -24,11 +24,14 @@ import java.util.Collections; import java.util.List; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.hadoop.io.IntWritable; /** * ####################################################################################################### @@ -136,6 +139,22 @@ public class CollectionDataSets { return env.fromCollection(data, type); } + public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getSmallNestedTupleDataSet(ExecutionEnvironment env) { + + List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<Tuple2<Tuple2<Integer, Integer>, String>>(); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(1,1), "one")); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(2,2), "two")); + data.add(new Tuple2<Tuple2<Integer,Integer>, String>(new Tuple2<Integer, Integer>(3,3), "three")); + + TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new + TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>>( + new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO), + BasicTypeInfo.STRING_TYPE_INFO + ); + + return env.fromCollection(data, type); + } + public static DataSet<String> getStringDataSet(ExecutionEnvironment env) { List<String> data = new ArrayList<String>(); @@ -241,7 +260,150 @@ public class CollectionDataSets { public String toString() { return myInt+","+myLong+","+myString; } - + } + + public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedPojoMatchingDataSet(ExecutionEnvironment env) { + List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data = new ArrayList<Tuple7<Integer, String, Integer, Integer, Long, String, Long>>(); + data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(1, "First",10, 100, 1000L, "One", 10000L)); + data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(2, "Second",20, 200, 2000L, "Two", 20000L)); + data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(3, "Third",30, 300, 3000L, "Three", 30000L)); + return env.fromCollection(data); + } + + public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) { + List<POJO> data = new ArrayList<POJO>(); + data.add(new POJO(1, "First",10, 100, 1000L, "One", 10000L)); + data.add(new POJO(2, "Second",20, 200, 2000L, "Two", 20000L)); + data.add(new POJO(3, "Third",30, 300, 3000L, "Three", 30000L)); + return env.fromCollection(data); + } + + public static class POJO { + public int number; + public String str; + public Tuple2<Integer, CustomType> nestedTupleWithCustom; + public NestedPojo nestedPojo; + public transient Long ignoreMe; + public POJO(int i0, String s0, + int i1, int i2, long l0, String s1, + long l1) { + this.number = i0; + this.str = s0; + this.nestedTupleWithCustom = new Tuple2<Integer, CustomType>(i1, new CustomType(i2, l0, s1)); + this.nestedPojo = new NestedPojo(); + this.nestedPojo.longNumber = l1; + } + public POJO() {} + @Override + public String toString() { + return number+" "+str+" "+nestedTupleWithCustom+" "+nestedPojo.longNumber; + } + } + + public static class NestedPojo { + public static Object ignoreMe; + public long longNumber; + public NestedPojo() {} + } + + public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) { + List<CrazyNested> data = new ArrayList<CrazyNested>(); + data.add(new CrazyNested("aa")); + data.add(new CrazyNested("bb")); + data.add(new CrazyNested("bb")); + data.add(new CrazyNested("cc")); + data.add(new CrazyNested("cc")); + data.add(new CrazyNested("cc")); + return env.fromCollection(data); + } + + public static class CrazyNested { + public CrazyNestedL1 nest_Lvl1; + public Long something; // test proper null-value handling + public CrazyNested() {} + public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values + this(set); + something = s; + nest_Lvl1.a = second; + } + public CrazyNested(String set) { + nest_Lvl1 = new CrazyNestedL1(); + nest_Lvl1.nest_Lvl2 = new CrazyNestedL2(); + nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3(); + nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4(); + nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set; + } + } + public static class CrazyNestedL1 { + public String a; + public int b; + public CrazyNestedL2 nest_Lvl2; + } + public static class CrazyNestedL2 { + public CrazyNestedL3 nest_Lvl3; + } + public static class CrazyNestedL3 { + public CrazyNestedL4 nest_Lvl4; + } + public static class CrazyNestedL4 { + public String f1nal; + } + + // Copied from TypeExtractorTest + public static class FromTuple extends Tuple3<String, String, Long> { + private static final long serialVersionUID = 1L; + public int special; + } + + public static class FromTupleWithCTor extends FromTuple { + public FromTupleWithCTor() {} + public FromTupleWithCTor(int special, long tupleField ) { + this.special = special; + this.setField(tupleField, 2); + } + } + public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) { + List<FromTupleWithCTor> data = new ArrayList<FromTupleWithCTor>(); + data.add(new FromTupleWithCTor(1, 10L)); // 3x + data.add(new FromTupleWithCTor(1, 10L)); + data.add(new FromTupleWithCTor(1, 10L)); + data.add(new FromTupleWithCTor(2, 20L)); // 2x + data.add(new FromTupleWithCTor(2, 20L)); + return env.fromCollection(data); + } + + public static class PojoContainingTupleAndWritable { + public int someInt; + public String someString; + public IntWritable hadoopFan; + public Tuple2<Long, Long> theTuple; + public PojoContainingTupleAndWritable() {} + public PojoContainingTupleAndWritable(int i, long l1, long l2) { + hadoopFan = new IntWritable(i); + someInt = i; + theTuple = new Tuple2<Long, Long>(l1, l2); + } + } + + public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) { + List<PojoContainingTupleAndWritable> data = new ArrayList<PojoContainingTupleAndWritable>(); + data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); + data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); + return env.fromCollection(data); + } + + public static DataSet<Tuple3<Integer,CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) { + List<Tuple3<Integer,CrazyNested, POJO>> data = new ArrayList<Tuple3<Integer,CrazyNested, POJO>>(); + data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) )); // 3x + data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) )); + data.add(new Tuple3<Integer,CrazyNested, POJO>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) )); + // POJO is not initialized according to the first two fields. + data.add(new Tuple3<Integer,CrazyNested, POJO>(2, new CrazyNested("two", "duo", 2L), new POJO(1, "First",10, 100, 1000L, "One", 10000L) )); // 1x + return env.fromCollection(data); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java index 6dedcc1..16267f6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java @@ -25,7 +25,6 @@ import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFie import org.apache.flink.types.DoubleValue; import org.apache.flink.types.IntValue; import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; /** * Cross PACT computes the distance of all data points to all cluster http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/926f835a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java index 1925a94..0821af5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java @@ -24,17 +24,20 @@ import java.util.Collection; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.Program; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; - +import org.apache.flink.test.localDistributed.PackagedProgramEndToEndITCase; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.IterativeDataSet; +/** + * This class belongs to the @see {@link PackagedProgramEndToEndITCase} test + * + */ @SuppressWarnings("serial") public class KMeansForTest implements Program { @@ -80,12 +83,8 @@ public class KMeansForTest implements Program { .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .map(new CountAppender()) - .groupBy(new KeySelector<DummyTuple3IntPointLong, Integer>() { - @Override - public Integer getKey(DummyTuple3IntPointLong value) throws Exception { - return value.f0; - } - }).reduce(new CentroidAccumulator()) + // !test if key expressions are working! + .groupBy("field0").reduce(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager()); @@ -228,16 +227,16 @@ public class KMeansForTest implements Program { // Use this so that we can check whether POJOs and the POJO comparator also work public static final class DummyTuple3IntPointLong { - public Integer f0; - public Point f1; - public Long f2; + public Integer field0; + public Point field1; + public Long field2; public DummyTuple3IntPointLong() {} DummyTuple3IntPointLong(Integer f0, Point f1, Long f2) { - this.f0 = f0; - this.f1 = f1; - this.f2 = f2; + this.field0 = f0; + this.field1 = f1; + this.field2 = f2; } } @@ -255,7 +254,7 @@ public class KMeansForTest implements Program { @Override public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong val1, DummyTuple3IntPointLong val2) { - return new DummyTuple3IntPointLong(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2); + return new DummyTuple3IntPointLong(val1.field0, val1.field1.add(val2.field1), val1.field2 + val2.field2); } } @@ -264,7 +263,7 @@ public class KMeansForTest implements Program { @Override public Centroid map(DummyTuple3IntPointLong value) { - return new Centroid(value.f0, value.f1.div(value.f2)); + return new Centroid(value.field0, value.field1.div(value.field2)); } } }
