[gelly] refactored tests; removed duplicate data from TestGraphUtils
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1e03062 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1e03062 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1e03062 Branch: refs/heads/master Commit: e1e03062ccab7db0534a866fa5a984095e2b5eef Parents: b529b62 Author: vasia <[email protected]> Authored: Wed Mar 4 15:49:00 2015 +0100 Committer: Vasia Kalavri <[email protected]> Committed: Wed Mar 4 21:31:56 2015 +0100 ---------------------------------------------------------------------- .../graph/example/LabelPropagationExample.java | 9 +- .../SingleSourceShortestPathsExample.java | 4 +- .../apache/flink/graph/test/DegreesITCase.java | 171 ------ .../flink/graph/test/FromCollectionITCase.java | 120 ----- .../flink/graph/test/GraphCreationITCase.java | 170 ------ .../test/GraphCreationWithMapperITCase.java | 158 ------ .../flink/graph/test/GraphMutationsITCase.java | 273 ---------- .../flink/graph/test/GraphOperationsITCase.java | 267 ---------- .../flink/graph/test/JoinWithEdgesITCase.java | 519 ------------------ .../graph/test/JoinWithVerticesITCase.java | 218 -------- .../test/LabelPropagationExampleITCase.java | 176 ------- .../apache/flink/graph/test/MapEdgesITCase.java | 223 -------- .../flink/graph/test/MapVerticesITCase.java | 233 --------- .../graph/test/ReduceOnEdgesMethodsITCase.java | 317 ----------- .../test/ReduceOnNeighborMethodsITCase.java | 303 ----------- .../apache/flink/graph/test/TestGraphUtils.java | 62 ++- .../example/LabelPropagationExampleITCase.java | 143 +++++ .../graph/test/operations/DegreesITCase.java | 172 ++++++ .../test/operations/FromCollectionITCase.java | 121 +++++ .../test/operations/GraphCreationITCase.java | 171 ++++++ .../GraphCreationWithMapperITCase.java | 159 ++++++ .../test/operations/GraphMutationsITCase.java | 274 ++++++++++ .../test/operations/GraphOperationsITCase.java | 268 ++++++++++ .../test/operations/JoinWithEdgesITCase.java | 520 +++++++++++++++++++ .../test/operations/JoinWithVerticesITCase.java | 219 ++++++++ .../graph/test/operations/MapEdgesITCase.java | 224 ++++++++ .../test/operations/MapVerticesITCase.java | 234 +++++++++ .../operations/ReduceOnEdgesMethodsITCase.java | 318 ++++++++++++ .../ReduceOnNeighborMethodsITCase.java | 304 +++++++++++ 29 files changed, 3174 insertions(+), 3176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java index 78cb5d5..e399b3f 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java @@ -85,8 +85,8 @@ public class LabelPropagationExample implements ProgramDescription { private static boolean parseParameters(String[] args) { if(args.length > 0) { - if(args.length != 5) { - System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>"); + if(args.length != 4) { + System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>"); return false; } @@ -94,13 +94,12 @@ public class LabelPropagationExample implements ProgramDescription { vertexInputPath = args[0]; edgeInputPath = args[1]; outputPath = args[2]; - numVertices = Integer.parseInt(args[3]); - maxIterations = Integer.parseInt(args[4]); + maxIterations = Integer.parseInt(args[3]); } else { System.out.println("Executing LabelPropagation example with default parameters and built-in default data."); System.out.println(" Provide parameters to read input data from files."); System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num vertices> <num iterations>"); + System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>"); } return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java index c590f30..6c85397 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java @@ -100,6 +100,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription { return true; } + @SuppressWarnings("serial") private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(verticesInputPath) @@ -119,6 +120,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription { } } + @SuppressWarnings("serial") private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { if (fileOutput) { return env.readCsvFile(edgesInputPath) @@ -128,7 +130,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription { @Override public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception { - return new Edge(tuple3.f0, tuple3.f1, tuple3.f2); + return new Edge<Long, Double>(tuple3.f0, tuple3.f1, tuple3.f2); } }); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java deleted file mode 100644 index 96a6d20..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class DegreesITCase extends MultipleProgramsTestBase { - - public DegreesITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testOutDegrees() throws Exception { - /* - * Test outDegrees() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,1\n" + - "3,2\n" + - "4,1\n" + - "5,1\n"; - } - - @Test - public void testOutDegreesWithNoOutEdges() throws Exception { - /* - * Test outDegrees() no outgoing edges - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); - - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,3\n" + - "2,1\n" + - "3,1\n" + - "4,1\n" + - "5,0\n"; - } - - @Test - public void testInDegrees() throws Exception { - /* - * Test inDegrees() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.inDegrees().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,1\n" + - "2,1\n" + - "3,2\n" + - "4,1\n" + - "5,2\n"; - } - - @Test - public void testInDegreesWithNoInEdge() throws Exception { - /* - * Test inDegrees() no ingoing edge - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); - - graph.inDegrees().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,0\n" + - "2,1\n" + - "3,1\n" + - "4,1\n" + - "5,3\n"; - } - - @Test - public void testGetDegrees() throws Exception { - /* - * Test getDegrees() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.getDegrees().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,3\n" + - "2,2\n" + - "3,4\n" + - "4,2\n" + - "5,3\n"; - } - - @Test - public void testGetDegreesWithDisconnectedData() throws Exception { - /* - * Test getDegrees() with disconnected data - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, NullValue, Long> graph = - Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2\n" + - "2,1\n" + - "3,0\n" + - "4,1\n" + - "5,0\n"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java deleted file mode 100644 index 5259143..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class FromCollectionITCase extends MultipleProgramsTestBase { - - public FromCollectionITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testFromCollectionVerticesEdges() throws Exception { - /* - * Test fromCollection(vertices, edges): - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), - TestGraphUtils.getLongLongEdges(), env); - - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testFromCollectionEdgesNoInitialValue() throws Exception { - /* - * Test fromCollection(edges) with no initial value for the vertices - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), - env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(null)\n" + - "2,(null)\n" + - "3,(null)\n" + - "4,(null)\n" + - "5,(null)\n"; - } - - @Test - public void testFromCollectionEdgesWithInitialValue() throws Exception { - /* - * Test fromCollection(edges) with vertices initialised by a - * function that takes the id and doubles it - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), - new InitVerticesMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,8\n" + - "5,10\n"; - } - - @SuppressWarnings("serial") - private static final class InitVerticesMapper implements MapFunction<Long, Long> { - public Long map(Long vertexId) { - return vertexId * 2; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java deleted file mode 100644 index 4cbdd90..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.validation.InvalidVertexIdsValidator; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class GraphCreationITCase extends MultipleProgramsTestBase { - - public GraphCreationITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testCreateWithoutVertexValues() throws Exception { - /* - * Test create() with edge dataset and no vertex values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(null)\n" + - "2,(null)\n" + - "3,(null)\n" + - "4,(null)\n" + - "5,(null)\n"; - } - - @Test - public void testCreateWithMapper() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns the id as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), - new AssignIdAsValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,1\n" + - "2,2\n" + - "3,3\n" + - "4,4\n" + - "5,5\n"; - } - - @Test - public void testCreateWithCustomVertexValue() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet( - TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(2.0,0)\n" + - "2,(4.0,1)\n" + - "3,(6.0,2)\n" + - "4,(8.0,3)\n" + - "5,(10.0,4)\n"; - } - - @Test - public void testValidate() throws Exception { - /* - * Test validate(): - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env); - DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env); - DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); - - result.writeAsText(resultPath); - env.execute(); - - expectedResult = "true\n"; - } - - @Test - public void testValidateWithInvalidIds() throws Exception { - /* - * Test validate() - invalid vertex ids - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env); - DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env); - DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); - result.writeAsText(resultPath); - env.execute(); - - expectedResult = "false\n"; - } - - @SuppressWarnings("serial") - private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> { - public Long map(Long vertexId) { - return vertexId; - } - } - - @SuppressWarnings("serial") - private static final class AssignCustomVertexValueMapper implements - MapFunction<Long, DummyCustomParameterizedType<Double>> { - - DummyCustomParameterizedType<Double> dummyValue = - new DummyCustomParameterizedType<Double>(); - - public DummyCustomParameterizedType<Double> map(Long vertexId) { - dummyValue.setIntField(vertexId.intValue()-1); - dummyValue.setTField(vertexId*2.0); - return dummyValue; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java deleted file mode 100644 index 24f7c82..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase { - - public GraphCreationWithMapperITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithDoubleValueMapper() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns a double constant as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), - new AssignDoubleValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,0.1\n" + - "2,0.1\n" + - "3,0.1\n" + - "4,0.1\n" + - "5,0.1\n"; - } - - @Test - public void testWithTuple2ValueMapper() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns a Tuple2 as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet( - TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(2,42)\n" + - "2,(4,42)\n" + - "3,(6,42)\n" + - "4,(8,42)\n" + - "5,(10,42)\n"; - } - - @Test - public void testWithConstantValueMapper() throws Exception { - /* - * Test create() with edge dataset with String key type - * and a mapper that assigns a double constant as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), - new AssignDoubleConstantMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,0.1\n" + - "2,0.1\n" + - "3,0.1\n" + - "4,0.1\n" + - "5,0.1\n"; - } - - @Test - public void testWithDCustomValueMapper() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns a custom vertex value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet( - TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(F,0)\n" + - "2,(F,1)\n" + - "3,(F,2)\n" + - "4,(F,3)\n" + - "5,(F,4)\n"; - } - - @SuppressWarnings("serial") - private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> { - public Double map(Long value) { - return 0.1d; - } - } - - @SuppressWarnings("serial") - private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> { - public Tuple2<Long, Long> map(Long vertexId) { - return new Tuple2<Long, Long>(vertexId*2, 42l); - } - } - - @SuppressWarnings("serial") - private static final class AssignDoubleConstantMapper implements MapFunction<String, Double> { - public Double map(String value) { - return 0.1d; - } - } - - @SuppressWarnings("serial") - private static final class AssignCustomValueMapper implements MapFunction<Long, DummyCustomType> { - public DummyCustomType map(Long vertexId) { - return new DummyCustomType(vertexId.intValue()-1, false); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java deleted file mode 100644 index 3af8943..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class GraphMutationsITCase extends MultipleProgramsTestBase { - - public GraphMutationsITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testAddVertex() throws Exception { - /* - * Test addVertex() -- simple case - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); - edges.add(new Edge<Long, Long>(6L, 1L, 61L)); - graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } - - @Test - public void testAddVertexExisting() throws Exception { - /* - * Test addVertex() -- add an existing vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); - edges.add(new Edge<Long, Long>(1L, 5L, 15L)); - graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "1,5,15\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testAddVertexNoEdges() throws Exception { - /* - * Test addVertex() -- add vertex with empty edge set - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); - graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,1\n" + - "2,2\n" + - "3,3\n" + - "4,4\n" + - "5,5\n" + - "6,6\n"; - } - - @Test - public void testRemoveVertex() throws Exception { - /* - * Test removeVertex() -- simple case - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n"; - } - - @Test - public void testRemoveInvalidVertex() throws Exception { - /* - * Test removeVertex() -- remove an invalid vertex - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testAddEdge() throws Exception { - /* - * Test addEdge() -- simple case - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L), - 61L); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } - - @Test - public void testAddExistingEdge() throws Exception { - /* - * Test addEdge() -- add already existing edge - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L), - 12L); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testRemoveVEdge() throws Exception { - /* - * Test removeEdge() -- simple case - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n"; - } - - @Test - public void testRemoveInvalidEdge() throws Exception { - /* - * Test removeEdge() -- invalid edge - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java deleted file mode 100644 index f194a60..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class GraphOperationsITCase extends MultipleProgramsTestBase { - - public GraphOperationsITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testUndirected() throws Exception { - /* - * Test getUndirected() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.getUndirected().getEdges().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,12\n" + "2,1,12\n" + - "1,3,13\n" + "3,1,13\n" + - "2,3,23\n" + "3,2,23\n" + - "3,4,34\n" + "4,3,34\n" + - "3,5,35\n" + "5,3,35\n" + - "4,5,45\n" + "5,4,45\n" + - "5,1,51\n" + "1,5,51\n"; - } - - @Test - public void testReverse() throws Exception { - /* - * Test reverse() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.reverse().getEdges().writeAsCsv(resultPath); - env.execute(); - expectedResult = "2,1,12\n" + - "3,1,13\n" + - "3,2,23\n" + - "4,3,34\n" + - "5,3,35\n" + - "5,4,45\n" + - "1,5,51\n"; - } - - @SuppressWarnings("serial") - @Test - public void testSubGraph() throws Exception { - /* - * Test subgraph: - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.subgraph(new FilterFunction<Vertex<Long, Long>>() { - public boolean filter(Vertex<Long, Long> vertex) throws Exception { - return (vertex.getValue() > 2); - } - }, - new FilterFunction<Edge<Long, Long>>() { - public boolean filter(Edge<Long, Long> edge) throws Exception { - return (edge.getValue() > 34); - } - }).getEdges().writeAsCsv(resultPath); - - env.execute(); - expectedResult = "3,5,35\n" + - "4,5,45\n"; - } - - @SuppressWarnings("serial") - @Test - public void testFilterVertices() throws Exception { - /* - * Test filterOnVertices: - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() { - public boolean filter(Vertex<Long, Long> vertex) throws Exception { - return (vertex.getValue() > 2); - } - }).getEdges().writeAsCsv(resultPath); - - env.execute(); - expectedResult = "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n"; - } - - @SuppressWarnings("serial") - @Test - public void testFilterEdges() throws Exception { - /* - * Test filterOnEdges: - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() { - public boolean filter(Edge<Long, Long> edge) throws Exception { - return (edge.getValue() > 34); - } - }).getEdges().writeAsCsv(resultPath); - - env.execute(); - expectedResult = "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testNumberOfVertices() throws Exception { - /* - * Test numberOfVertices() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.numberOfVertices().writeAsText(resultPath); - - env.execute(); - expectedResult = "5"; - } - - @Test - public void testNumberOfEdges() throws Exception { - /* - * Test numberOfEdges() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.numberOfEdges().writeAsText(resultPath); - - env.execute(); - expectedResult = "7"; - } - - @Test - public void testVertexIds() throws Exception { - /* - * Test getVertexIds() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.getVertexIds().writeAsText(resultPath); - - env.execute(); - expectedResult = "1\n2\n3\n4\n5\n"; - } - - @Test - public void testEdgesIds() throws Exception { - /* - * Test getEdgeIds() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.getEdgeIds().writeAsCsv(resultPath); - - env.execute(); - expectedResult = "1,2\n" + "1,3\n" + - "2,3\n" + "3,4\n" + - "3,5\n" + "4,5\n" + - "5,1\n"; - } - - @Test - public void testUnion() throws Exception { - /* - * Test union() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>(); - List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); - - vertices.add(new Vertex<Long, Long>(6L, 6L)); - edges.add(new Edge<Long, Long>(6L, 1L, 61L)); - - graph = graph.union(Graph.fromCollection(vertices, edges, env)); - - graph.getEdges().writeAsCsv(resultPath); - - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java deleted file mode 100644 index 6f4f6a8..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.utils.EdgeToTuple3Map; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class JoinWithEdgesITCase extends MultipleProgramsTestBase { - - public JoinWithEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithEdgesInputDataset() throws Exception { - /* - * Test joinWithEdges with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges() - .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,70\n" + - "4,5,90\n" + - "5,1,102\n"; - } - - @Test - public void testWithLessElements() throws Exception { - /* - * Test joinWithEdges with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) - .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithLessElementsDifferentType() throws Exception { - /* - * Test joinWithEdges with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) - .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithNoCommonKeys() throws Exception { - /* - * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), - new DoubleValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithCustomType() throws Exception { - /* - * Test joinWithEdges with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), - new CustomValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,10\n" + - "1,3,20\n" + - "2,3,30\n" + - "3,4,40\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithEdgesOnSource() throws Exception { - /* - * Test joinWithEdgesOnSource with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges() - .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,25\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,69\n" + - "4,5,90\n" + - "5,1,102\n"; - } - - @Test - public void testOnSourceWithLessElements() throws Exception { - /* - * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) - .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,25\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testOnSourceWithDifferentType() throws Exception { - /* - * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) - .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testOnSourceWithNoCommonKeys() throws Exception { - /* - * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), - new DoubleValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,20\n" + - "1,3,20\n" + - "2,3,60\n" + - "3,4,80\n" + - "3,5,80\n" + - "4,5,120\n" + - "5,1,51\n"; - } - - @Test - public void testOnSourceWithCustom() throws Exception { - /* - * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), - new CustomValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,10\n" + - "1,3,10\n" + - "2,3,30\n" + - "3,4,40\n" + - "3,5,40\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithEdgesOnTarget() throws Exception { - /* - * Test joinWithEdgesOnTarget with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges() - .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,36\n" + - "3,4,68\n" + - "3,5,70\n" + - "4,5,80\n" + - "5,1,102\n"; - } - - @Test - public void testWithOnTargetWithLessElements() throws Exception { - /* - * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) - .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,36\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testOnTargetWithDifferentType() throws Exception { - /* - * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) - .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testOnTargetWithNoCommonKeys() throws Exception { - /* - * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), - new DoubleValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,20\n" + - "1,3,40\n" + - "2,3,40\n" + - "3,4,80\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,140\n"; - } - - @Test - public void testOnTargetWithCustom() throws Exception { - /* - * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), - new CustomValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,10\n" + - "1,3,20\n" + - "2,3,20\n" + - "3,4,40\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @SuppressWarnings("serial") - private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - } - - @SuppressWarnings("serial") - private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> { - public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple3<Long, Long, Boolean>(edge.getSource(), - edge.getTarget(), true); - } - } - - @SuppressWarnings("serial") - private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { - public Long map(Tuple2<Long, Boolean> tuple) throws Exception { - if(tuple.f1) { - return tuple.f0 * 2; - } - else { - return tuple.f0; - } - } - } - - @SuppressWarnings("serial") - private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f1 * 2; - } - } - - @SuppressWarnings("serial") - private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { - public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { - return (long) tuple.f1.getIntField(); - } - } - - @SuppressWarnings("serial") - private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { - public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); - } - } - - @SuppressWarnings("serial") - private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { - public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Boolean>(edge.getSource(), true); - } - } - - @SuppressWarnings("serial") - private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { - public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); - } - } - - @SuppressWarnings("serial") - private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { - public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Boolean>(edge.getTarget(), true); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java deleted file mode 100644 index 0574265..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.utils.VertexToTuple2Map; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class JoinWithVerticesITCase extends MultipleProgramsTestBase { - - public JoinWithVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testJoinWithVertexSet() throws Exception { - /* - * Test joinWithVertices with the input DataSet parameter identical - * to the vertex DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices() - .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,8\n" + - "5,10\n"; - } - - @Test - public void testWithLessElements() throws Exception { - /* - * Test joinWithVertices with the input DataSet passed as a parameter containing - * less elements than the vertex DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) - .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,4\n" + - "5,5\n"; - } - - @Test - public void testWithDifferentType() throws Exception { - /* - * Test joinWithVertices with the input DataSet passed as a parameter containing - * less elements than the vertex DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) - .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,4\n" + - "5,5\n"; - } - - @Test - public void testWithDifferentKeys() throws Exception { - /* - * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), - new ProjectSecondMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,10\n" + - "2,20\n" + - "3,30\n" + - "4,40\n" + - "5,5\n"; - } - - @Test - public void testWithCustomType() throws Exception { - /* - * Test joinWithVertices with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), - new CustomValueMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,10\n" + - "2,20\n" + - "3,30\n" + - "4,40\n" + - "5,5\n"; - } - - @SuppressWarnings("serial") - private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - } - - @SuppressWarnings("serial") - private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> { - public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception { - return new Tuple2<Long, Boolean>(vertex.getId(), true); - } - } - - @SuppressWarnings("serial") - private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { - public Long map(Tuple2<Long, Boolean> tuple) throws Exception { - if(tuple.f1) { - return tuple.f0 * 2; - } - else { - return tuple.f0; - } - } - } - - @SuppressWarnings("serial") - private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f1; - } - } - - @SuppressWarnings("serial") - private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { - public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { - return (long) tuple.f1.getIntField(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java deleted file mode 100755 index dfb0f3f..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/LabelPropagationExampleITCase.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.flink.graph.example.LabelPropagationExample; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class LabelPropagationExampleITCase extends MultipleProgramsTestBase { - - public LabelPropagationExampleITCase(ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testSingleIteration() throws Exception { - /* - * Test one iteration of label propagation example with a simple graph - */ - - final String vertices = "1 10\n" + - "2 10\n" + - "3 30\n" + - "4 40\n" + - "5 40\n" + - "6 40\n" + - "7 70\n"; - - final String edges = "1 3\n" + - "2 3\n" + - "4 7\n" + - "5 7\n" + - "6 7\n" + - "7 3\n"; - - String verticesPath = createTempFile(vertices); - String edgesPath = createTempFile(edges); - - LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "7", "1"}); - - expectedResult = "1,10\n" + - "2,10\n" + - "3,10\n" + - "4,40\n" + - "5,40\n" + - "6,40\n" + - "7,40\n"; - } - - @Test - public void testTieBreaker() throws Exception { - /* - * Test the label propagation example where a tie must be broken - */ - - final String vertices = "1 10\n" + - "2 10\n" + - "3 10\n" + - "4 10\n" + - "5 0\n" + - "6 20\n" + - "7 20\n" + - "8 20\n" + - "9 20\n"; - - final String edges = "1 5\n" + - "2 5\n" + - "3 5\n" + - "4 5\n" + - "6 5\n" + - "7 5\n" + - "8 5\n" + - "9 5\n"; - - String verticesPath = createTempFile(vertices); - String edgesPath = createTempFile(edges); - - LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "9", "1"}); - - expectedResult = "1,10\n" + - "2,10\n" + - "3,10\n" + - "4,10\n" + - "5,20\n" + - "6,20\n" + - "7,20\n" + - "8,20\n" + - "9,20\n"; - } - - @Test - public void testTermination() throws Exception { - /* - * Test the label propagation example where the algorithm terminates on the first iteration - */ - - final String vertices = "1 10\n" + - "2 10\n" + - "3 10\n" + - "4 40\n" + - "5 40\n" + - "6 40\n"; - - final String edges = "1 2\n" + - "2 3\n" + - "3 1\n" + - "4 5\n" + - "5 6\n" + - "6 4\n"; - - String verticesPath = createTempFile(vertices); - String edgesPath = createTempFile(edges); - - LabelPropagationExample.main(new String[]{verticesPath, edgesPath, resultPath, "6", "2"}); - - expectedResult = "1,10\n" + - "2,10\n" + - "3,10\n" + - "4,40\n" + - "5,40\n" + - "6,40\n"; - } - - // ------------------------------------------------------------------------- - // Util methods - // ------------------------------------------------------------------------- - - private String createTempFile(final String rows) throws Exception { - File tempFile = tempFolder.newFile(); - Files.write(rows, tempFile, Charsets.UTF_8); - return tempFile.toURI().toString(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java deleted file mode 100644 index f7a585d..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class MapEdgesITCase extends MultipleProgramsTestBase { - - public MapEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithSameValue() throws Exception { - /* - * Test mapEdges() keeping the same value type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,13\n" + - "1,3,14\n" + - "2,3,24\n" + - "3,4,35\n" + - "3,5,36\n" + - "4,5,46\n" + - "5,1,52\n"; - } - - @Test - public void testWithStringValue() throws Exception { - /* - * Test mapEdges() and change the value type to String - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,string(12)\n" + - "1,3,string(13)\n" + - "2,3,string(23)\n" + - "3,4,string(34)\n" + - "3,5,string(35)\n" + - "4,5,string(45)\n" + - "5,1,string(51)\n"; - } - - @Test - public void testWithTuple1Type() throws Exception { - /* - * Test mapEdges() and change the value type to a Tuple1 - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,(12)\n" + - "1,3,(13)\n" + - "2,3,(23)\n" + - "3,4,(34)\n" + - "3,5,(35)\n" + - "4,5,(45)\n" + - "5,1,(51)\n"; - } - - @Test - public void testWithCustomType() throws Exception { - /* - * Test mapEdges() and change the value type to a custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,(T,12)\n" + - "1,3,(T,13)\n" + - "2,3,(T,23)\n" + - "3,4,(T,34)\n" + - "3,5,(T,35)\n" + - "4,5,(T,45)\n" + - "5,1,(T,51)\n"; - } - - @Test - public void testWithParametrizedCustomType() throws Exception { - /* - * Test mapEdges() and change the value type to a parameterized custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges( - new ToCustomParametrizedTypeMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,(12.0,12)\n" + - "1,3,(13.0,13)\n" + - "2,3,(23.0,23)\n" + - "3,4,(34.0,34)\n" + - "3,5,(35.0,35)\n" + - "4,5,(45.0,45)\n" + - "5,1,(51.0,51)\n"; - } - - @SuppressWarnings("serial") - private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> { - public Long map(Edge<Long, Long> edge) throws Exception { - return edge.getValue()+1; - } - } - - @SuppressWarnings("serial") - private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> { - public String map(Edge<Long, Long> edge) throws Exception { - return String.format("string(%d)", edge.getValue()); - } - } - - @SuppressWarnings("serial") - private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> { - public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception { - Tuple1<Long> tupleValue = new Tuple1<Long>(); - tupleValue.setFields(edge.getValue()); - return tupleValue; - } - } - - @SuppressWarnings("serial") - private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> { - public DummyCustomType map(Edge<Long, Long> edge) throws Exception { - DummyCustomType dummyValue = new DummyCustomType(); - dummyValue.setIntField(edge.getValue().intValue()); - return dummyValue; - } - } - - @SuppressWarnings("serial") - private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>, - DummyCustomParameterizedType<Double>> { - - public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception { - DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); - dummyValue.setIntField(edge.getValue().intValue()); - dummyValue.setTField(new Double(edge.getValue())); - return dummyValue; - } - } -} \ No newline at end of file
