http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java new file mode 100644 index 0000000..dfb315e --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java @@ -0,0 +1,520 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.utils.EdgeToTuple3Map; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class JoinWithEdgesITCase extends MultipleProgramsTestBase { + + public JoinWithEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithEdgesInputDataset() throws Exception { + /* + * Test joinWithEdges with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges() + .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,70\n" + + "4,5,90\n" + + "5,1,102\n"; + } + + @Test + public void testWithLessElements() throws Exception { + /* + * Test joinWithEdges with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) + .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithLessElementsDifferentType() throws Exception { + /* + * Test joinWithEdges with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) + .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test joinWithEdges with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,20\n" + + "2,3,30\n" + + "3,4,40\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithEdgesOnSource() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges() + .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,25\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,69\n" + + "4,5,90\n" + + "5,1,102\n"; + } + + @Test + public void testOnSourceWithLessElements() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,25\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnSourceWithDifferentType() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnSourceWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,20\n" + + "1,3,20\n" + + "2,3,60\n" + + "3,4,80\n" + + "3,5,80\n" + + "4,5,120\n" + + "5,1,51\n"; + } + + @Test + public void testOnSourceWithCustom() throws Exception { + /* + * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,10\n" + + "2,3,30\n" + + "3,4,40\n" + + "3,5,40\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithEdgesOnTarget() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges() + .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,36\n" + + "3,4,68\n" + + "3,5,70\n" + + "4,5,80\n" + + "5,1,102\n"; + } + + @Test + public void testWithOnTargetWithLessElements() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,36\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnTargetWithDifferentType() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnTargetWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,20\n" + + "1,3,40\n" + + "2,3,40\n" + + "3,4,80\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,140\n"; + } + + @Test + public void testOnTargetWithCustom() throws Exception { + /* + * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,20\n" + + "2,3,20\n" + + "3,4,40\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @SuppressWarnings("serial") + private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> { + public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple3<Long, Long, Boolean>(edge.getSource(), + edge.getTarget(), true); + } + } + + @SuppressWarnings("serial") + private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if(tuple.f1) { + return tuple.f0 * 2; + } + else { + return tuple.f0; + } + } + } + + @SuppressWarnings("serial") + private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1 * 2; + } + } + + @SuppressWarnings("serial") + private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { + public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + } + + @SuppressWarnings("serial") + private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Boolean>(edge.getSource(), true); + } + } + + @SuppressWarnings("serial") + private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Boolean>(edge.getTarget(), true); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java new file mode 100644 index 0000000..28a0441 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.utils.VertexToTuple2Map; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class JoinWithVerticesITCase extends MultipleProgramsTestBase { + + public JoinWithVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testJoinWithVertexSet() throws Exception { + /* + * Test joinWithVertices with the input DataSet parameter identical + * to the vertex DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices() + .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,8\n" + + "5,10\n"; + } + + @Test + public void testWithLessElements() throws Exception { + /* + * Test joinWithVertices with the input DataSet passed as a parameter containing + * less elements than the vertex DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) + .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,4\n" + + "5,5\n"; + } + + @Test + public void testWithDifferentType() throws Exception { + /* + * Test joinWithVertices with the input DataSet passed as a parameter containing + * less elements than the vertex DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) + .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,4\n" + + "5,5\n"; + } + + @Test + public void testWithDifferentKeys() throws Exception { + /* + * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), + new ProjectSecondMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,10\n" + + "2,20\n" + + "3,30\n" + + "4,40\n" + + "5,5\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test joinWithVertices with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), + new CustomValueMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,10\n" + + "2,20\n" + + "3,30\n" + + "4,40\n" + + "5,5\n"; + } + + @SuppressWarnings("serial") + private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception { + return new Tuple2<Long, Boolean>(vertex.getId(), true); + } + } + + @SuppressWarnings("serial") + private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if(tuple.f1) { + return tuple.f0 * 2; + } + else { + return tuple.f0; + } + } + } + + @SuppressWarnings("serial") + private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { + public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java new file mode 100644 index 0000000..a5c01cf --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class MapEdgesITCase extends MultipleProgramsTestBase { + + public MapEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithSameValue() throws Exception { + /* + * Test mapEdges() keeping the same value type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,13\n" + + "1,3,14\n" + + "2,3,24\n" + + "3,4,35\n" + + "3,5,36\n" + + "4,5,46\n" + + "5,1,52\n"; + } + + @Test + public void testWithStringValue() throws Exception { + /* + * Test mapEdges() and change the value type to String + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,string(12)\n" + + "1,3,string(13)\n" + + "2,3,string(23)\n" + + "3,4,string(34)\n" + + "3,5,string(35)\n" + + "4,5,string(45)\n" + + "5,1,string(51)\n"; + } + + @Test + public void testWithTuple1Type() throws Exception { + /* + * Test mapEdges() and change the value type to a Tuple1 + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(12)\n" + + "1,3,(13)\n" + + "2,3,(23)\n" + + "3,4,(34)\n" + + "3,5,(35)\n" + + "4,5,(45)\n" + + "5,1,(51)\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test mapEdges() and change the value type to a custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(T,12)\n" + + "1,3,(T,13)\n" + + "2,3,(T,23)\n" + + "3,4,(T,34)\n" + + "3,5,(T,35)\n" + + "4,5,(T,45)\n" + + "5,1,(T,51)\n"; + } + + @Test + public void testWithParametrizedCustomType() throws Exception { + /* + * Test mapEdges() and change the value type to a parameterized custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges( + new ToCustomParametrizedTypeMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(12.0,12)\n" + + "1,3,(13.0,13)\n" + + "2,3,(23.0,23)\n" + + "3,4,(34.0,34)\n" + + "3,5,(35.0,35)\n" + + "4,5,(45.0,45)\n" + + "5,1,(51.0,51)\n"; + } + + @SuppressWarnings("serial") + private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> { + public Long map(Edge<Long, Long> edge) throws Exception { + return edge.getValue()+1; + } + } + + @SuppressWarnings("serial") + private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> { + public String map(Edge<Long, Long> edge) throws Exception { + return String.format("string(%d)", edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> { + public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception { + Tuple1<Long> tupleValue = new Tuple1<Long>(); + tupleValue.setFields(edge.getValue()); + return tupleValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> { + public DummyCustomType map(Edge<Long, Long> edge) throws Exception { + DummyCustomType dummyValue = new DummyCustomType(); + dummyValue.setIntField(edge.getValue().intValue()); + return dummyValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>, + DummyCustomParameterizedType<Double>> { + + public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception { + DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); + dummyValue.setIntField(edge.getValue().intValue()); + dummyValue.setTField(new Double(edge.getValue())); + return dummyValue; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java new file mode 100644 index 0000000..0d92fc9 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class MapVerticesITCase extends MultipleProgramsTestBase { + + public MapVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithSameValue() throws Exception { + /* + * Test mapVertices() keeping the same value type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,6\n"; + } + + @Test + public void testWithStringValue() throws Exception { + /* + * Test mapVertices() and change the value type to String + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,one\n" + + "2,two\n" + + "3,three\n" + + "4,four\n" + + "5,five\n"; + } + + @Test + public void testWithtuple1Value() throws Exception { + /* + * Test mapVertices() and change the value type to a Tuple1 + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,(1)\n" + + "2,(2)\n" + + "3,(3)\n" + + "4,(4)\n" + + "5,(5)\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test mapVertices() and change the value type to a custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,(T,1)\n" + + "2,(T,2)\n" + + "3,(T,3)\n" + + "4,(T,4)\n" + + "5,(T,5)\n"; + } + + @Test + public void testWithCustomParametrizedType() throws Exception { + /* + * Test mapVertices() and change the value type to a parameterized custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices( + new ToCustomParametrizedTypeMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,(1.0,1)\n" + + "2,(2.0,2)\n" + + "3,(3.0,3)\n" + + "4,(4.0,4)\n" + + "5,(5.0,5)\n"; + } + + @SuppressWarnings("serial") + private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> { + public Long map(Vertex<Long, Long> value) throws Exception { + return value.getValue()+1; + } + } + + @SuppressWarnings("serial") + private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> { + public String map(Vertex<Long, Long> vertex) throws Exception { + String stringValue; + if (vertex.getValue() == 1) { + stringValue = "one"; + } + else if (vertex.getValue() == 2) { + stringValue = "two"; + } + else if (vertex.getValue() == 3) { + stringValue = "three"; + } + else if (vertex.getValue() == 4) { + stringValue = "four"; + } + else if (vertex.getValue() == 5) { + stringValue = "five"; + } + else { + stringValue = ""; + } + return stringValue; + } + } + + @SuppressWarnings("serial") + private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> { + public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception { + Tuple1<Long> tupleValue = new Tuple1<Long>(); + tupleValue.setFields(vertex.getValue()); + return tupleValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> { + public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception { + DummyCustomType dummyValue = new DummyCustomType(); + dummyValue.setIntField(vertex.getValue().intValue()); + return dummyValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>, + DummyCustomParameterizedType<Double>> { + + public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception { + DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); + dummyValue.setIntField(vertex.getValue().intValue()); + dummyValue.setTField(new Double(vertex.getValue())); + return dummyValue; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java new file mode 100644 index 0000000..73dec2a --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.EdgesFunction; +import org.apache.flink.graph.EdgesFunctionWithVertexValue; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { + + public ReduceOnEdgesMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testLowestWeightOutNeighbor() throws Exception { + /* + * Get the lowest-weight out-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testLowestWeightInNeighbor() throws Exception { + /* + * Get the lowest-weight in-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; + } + + @Test + public void testMaxWeightEdge() throws Exception { + /* + * Get the maximum weight among all edges + * of a vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); + verticesWithMaxEdgeWeight.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,51\n" + + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + } + + @Test + public void testLowestWeightOutNeighborNoValue() throws Exception { + /* + * Get the lowest-weight out-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testLowestWeightInNeighborNoValue() throws Exception { + /* + * Get the lowest-weight in-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; + } + + @Test + public void testMaxWeightAllNeighbors() throws Exception { + /* + * Get the maximum weight among all edges + * of a vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL); + verticesWithMaxEdgeWeight.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,51\n" + + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges( + Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getTarget(); + } + } + return new Tuple2<Long, Long>(v.getId(), minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MIN_VALUE; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() > weight) { + weight = edge.getValue(); + } + } + return new Tuple2<Long, Long>(v.getId(), weight); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() < weight) { + weight = edge.f1.getValue(); + minNeighorId = edge.f1.getTarget(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MIN_VALUE; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() > weight) { + weight = edge.f1.getValue(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, weight); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges( + Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getSource(); + } + } + return new Tuple2<Long, Long>(v.getId(), minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() < weight) { + weight = edge.f1.getValue(); + minNeighorId = edge.f1.getSource(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, minNeighorId); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java new file mode 100644 index 0000000..c1e982f --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import java.util.Iterator; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.NeighborsFunctionWithVertexValue; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { + + public ReduceOnNeighborMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testSumOfOutNeighbors() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,5\n" + + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testSumOfInNeighbors() throws Exception { + /* + * Get the sum of in-neighbor values + * times the edge weights for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,255\n" + + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; + } + + @Test + public void testSumOfOAllNeighbors() throws Exception { + /* + * Get the sum of all neighbor values + * including own vertex value + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,11\n" + + "2,6\n" + + "3,15\n" + + "4,12\n" + + "5,13\n"; + } + + @Test + public void testSumOfOutNeighborsNoValue() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testSumOfInNeighborsNoValue() throws Exception { + /* + * Get the sum of in-neighbor values + * times the edge weights for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,255\n" + + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; + } + + @Test + public void testSumOfAllNeighborsNoValue() throws Exception { + /* + * Get the sum of all neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,10\n" + + "2,4\n" + + "3,12\n" + + "4,8\n" + + "5,8\n"; + } + + @SuppressWarnings("serial") + private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f0.getValue() * neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue(); + } + return new Tuple2<Long, Long>(next.f0, sum); + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue() * next.f1.getValue(); + } + return new Tuple2<Long, Long>(next.f0, sum); + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue(); + } + return new Tuple2<Long, Long>(next.f0, sum); + } + } +} \ No newline at end of file
