Repository: flink
Updated Branches:
  refs/heads/master 5350bc48a -> b0a7a1b81


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java
new file mode 100644
index 0000000..3bb904e
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This base class handles the task of dividing the requested work into the
+ * appropriate number of blocks of near-equal size.
+ *
+ * @param <T> the type of the {@code RandomGenerator}
+ */
+public abstract class AbstractGeneratorFactory<T extends RandomGenerator>
+implements RandomGenerableFactory<T> {
+
+       // A large computation will run in parallel but blocks are generated on
+       // and distributed from a single node. This limit should be greater
+       // than the maximum expected parallelism.
+       public static final int MAXIMUM_BLOCK_COUNT = 1 << 20;
+
+       // This should be sufficiently large relative to the cost of 
instantiating
+       // and initializing the random generator and sufficiently small 
relative to
+       // the cost of generating random values.
+       protected abstract int getMinimumCyclesPerBlock();
+
+       protected abstract RandomGenerable<T> next();
+
+       @Override
+       public List<BlockInfo<T>> getRandomGenerables(long elementCount, int 
cyclesPerElement) {
+               long cycles = elementCount * cyclesPerElement;
+               int blockCount = Math.min((int) Math.ceil(cycles / (float) 
getMinimumCyclesPerBlock()), MAXIMUM_BLOCK_COUNT);
+
+               long elementsPerBlock = elementCount / blockCount;
+               long elementRemainder = elementCount % blockCount;
+
+               List<BlockInfo<T>> blocks = new ArrayList<>(blockCount);
+               long blockStart = 0;
+
+               for (int blockIndex = 0 ; blockIndex < blockCount ; 
blockIndex++) {
+                       if (blockIndex == blockCount - elementRemainder) {
+                               elementsPerBlock++;
+                       }
+
+                       RandomGenerable<T> randomGenerable = next();
+
+                       blocks.add(new BlockInfo<>(randomGenerable, blockIndex, 
blockCount, blockStart, elementsPerBlock));
+
+                       blockStart += elementsPerBlock;
+               }
+
+               return blocks;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java
new file mode 100644
index 0000000..5e30a3f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+/**
+ * Defines a source of randomness and a unit of work.
+ *
+ * @param <T> the type of the {@code RandomGenerator}
+ */
+public class BlockInfo<T extends RandomGenerator> {
+
+       private final RandomGenerable<T> randomGenerable;
+
+       private final int blockIndex;
+
+       private final int blockCount;
+
+       private final long firstElement;
+
+       private final long elementCount;
+
+       public BlockInfo(RandomGenerable<T> randomGenerable, int blockIndex, 
int blockCount, long firstElement, long elementCount) {
+               this.randomGenerable = randomGenerable;
+               this.blockIndex = blockIndex;
+               this.blockCount = blockCount;
+               this.firstElement = firstElement;
+               this.elementCount = elementCount;
+       }
+
+       /**
+        * @return the source of randomness
+        */
+       public RandomGenerable<T> getRandomGenerable() {
+               return randomGenerable;
+       }
+
+       /**
+        * @return the index of this block within the list of blocks
+        */
+       public int getBlockIndex() {
+               return blockIndex;
+       }
+
+       /**
+        * @return the total number of blocks
+        */
+       public int getBlockCount() {
+               return blockCount;
+       }
+
+       /**
+        * @return the index of the first element in this block
+        */
+       public long getFirstElement() {
+               return firstElement;
+       }
+
+       /**
+        * @return the total number of elements across all blocks
+        */
+       public long getElementCount() {
+               return elementCount;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java
new file mode 100644
index 0000000..2024cae
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+
+/**
+ * Uses a seeded {@link JDKRandomGenerator} to generate seeds for the
+ * distributed collection of {@link JDKRandomGenerator}.
+ */
+public class JDKRandomGeneratorFactory
+extends AbstractGeneratorFactory<JDKRandomGenerator> {
+
+       public static final long DEFAULT_SEED = 0x4b6f7e18198de7a4L;
+
+       public static final int MINIMUM_CYCLES_PER_BLOCK = 1 << 20;
+
+       private final JDKRandomGenerator random = new JDKRandomGenerator();
+
+       public JDKRandomGeneratorFactory() {
+               this(DEFAULT_SEED);
+       }
+
+       public JDKRandomGeneratorFactory(long seed) {
+               random.setSeed(seed);
+       }
+
+       @Override
+       protected int getMinimumCyclesPerBlock() {
+               return MINIMUM_CYCLES_PER_BLOCK;
+       }
+
+       @Override
+       protected JDKRandomGenerable next() {
+               return new JDKRandomGenerable(random.nextLong());
+       }
+
+       private static class JDKRandomGenerable
+       implements RandomGenerable<JDKRandomGenerator> {
+
+               private final long seed;
+
+               public JDKRandomGenerable(long seed) {
+                       this.seed = seed;
+               }
+
+               @Override
+               public JDKRandomGenerator generator() {
+                       JDKRandomGenerator random = new JDKRandomGenerator();
+
+                       random.setSeed(seed);
+
+                       return random;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java
new file mode 100644
index 0000000..22a7b04
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.MersenneTwister;
+
+/**
+ * Uses a seeded {@link MersenneTwister} to generate seeds for the
+ * distributed collection of {@link MersenneTwister}.
+ */
+public class MersenneTwisterFactory
+extends AbstractGeneratorFactory<MersenneTwister> {
+
+       public static final long DEFAULT_SEED = 0x74c8cc8a58a9ceb9L;
+
+       public static final int MINIMUM_CYCLES_PER_BLOCK = 1 << 20;
+
+       private final MersenneTwister random = new MersenneTwister();
+
+       public MersenneTwisterFactory() {
+               this(DEFAULT_SEED);
+       }
+
+       public MersenneTwisterFactory(long seed) {
+               random.setSeed(seed);
+       }
+
+       @Override
+       protected int getMinimumCyclesPerBlock() {
+               return MINIMUM_CYCLES_PER_BLOCK;
+       }
+
+       @Override
+       protected MersenneTwisterGenerable next() {
+               return new MersenneTwisterGenerable(random.nextLong());
+       }
+
+       private static class MersenneTwisterGenerable
+       implements RandomGenerable<MersenneTwister> {
+
+               private final long seed;
+
+               public MersenneTwisterGenerable(long seed) {
+                       this.seed = seed;
+               }
+
+               @Override
+               public MersenneTwister generator() {
+                       MersenneTwister random = new MersenneTwister();
+
+                       random.setSeed(seed);
+
+                       return random;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java
new file mode 100644
index 0000000..318b508
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+/**
+ * A RandomGenerable provides deferred instantiation and initialization of a
+ * RandomGenerator. This allows pre-processing or discovery to be distributed
+ * and performed in parallel by Flink tasks.
+ *
+ * A distributed PRNG is described by Matsumoto and Takuji in
+ * "Dynamic Creation of Pseudorandom Number Generators".
+ *
+ * @param <T> the type of the {@code RandomGenerator}
+ */
+public interface RandomGenerable<T extends RandomGenerator> {
+
+       /**
+        * Returns an initialized {@link RandomGenerator}.
+        *
+        * @return a {@code RandomGenerator} of type {@code T}
+        */
+       T generator();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java
new file mode 100644
index 0000000..ead29fc
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+import java.util.List;
+
+/**
+ * A {@code RandomGenerableFactory} returns a scale-free collection of sources
+ * of pseudorandomness which can be used to perform repeatable parallel
+ * computation regardless of parallelism.
+ *
+ * <pre>
+ * {@code
+ * RandomGenerableFactory<JDKRandomGenerator> factory = new 
JDKRandomGeneratorFactory()
+ *
+ * List<BlockInfo<T>> generatorBlocks = factory
+ *     .getRandomGenerables(elementCount, cyclesPerElement);
+ *
+ * DataSet<...> generatedEdges = env
+ *     .fromCollection(generatorBlocks)
+ *         .name("Random generators")
+ *     .flatMap(...
+ * }
+ * </pre>
+ *
+ * @param <T> the type of the {@code RandomGenerator}
+ */
+public interface RandomGenerableFactory<T extends RandomGenerator> {
+
+       /**
+        * The amount of work ({@code elementCount * cyclerPerElement}) is used 
to
+        * generate a list of blocks of work of near-equal size.
+        *
+        * @param elementCount number of elements, as indexed in the {@code 
BlockInfo}
+        * @param cyclesPerElement number of cycles of the PRNG per element
+        * @return the list of configuration blocks
+        */
+       List<BlockInfo<T>> getRandomGenerables(long elementCount, int 
cyclesPerElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
new file mode 100644
index 0000000..1cac80b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+
+public class AbstractGraphTest {
+
+       protected ExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+               env = ExecutionEnvironment.createCollectionsEnvironment();
+               env.getConfig().disableSysoutLogging();
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
new file mode 100644
index 0000000..af47fdc
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CompleteGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraph()
+                       throws Exception {
+               int vertexCount = 4;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
CompleteGraph(env, vertexCount)
+                       .generate();
+
+               String vertices = "0; 1; 2; 3";
+               String edges = "0,1; 0,2; 0,3; 1,0; 1,2; 1,3; 2,0; 2,1; 2,3; 
3,0; 3,1; 3,2";
+
+               TestUtils.compareGraph(graph, vertices, edges);
+       }
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               int vertexCount = 10;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
CompleteGraph(env, vertexCount)
+                       .generate();
+
+               assertEquals(vertexCount, graph.numberOfVertices());
+               assertEquals(vertexCount*(vertexCount-1), 
graph.numberOfEdges());
+
+               long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+               long minOutDegree = 
graph.outDegrees().min(1).collect().get(0).f1;
+               long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+               long maxOutDegree = 
graph.outDegrees().max(1).collect().get(0).f1;
+
+               assertEquals(vertexCount - 1, minInDegree);
+               assertEquals(vertexCount - 1, minOutDegree);
+               assertEquals(vertexCount - 1, maxInDegree);
+               assertEquals(vertexCount - 1, maxOutDegree);
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
CompleteGraph(env, 10)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
new file mode 100644
index 0000000..fb6799b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CycleGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraph()
+                       throws Exception {
+               Graph<LongValue,NullValue,NullValue> graph = new 
CycleGraph(env, 10)
+                       .generate();
+
+               String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+               String edges = "0,1; 1,0; 1,2; 2,1; 2,3; 3,2; 3,4; 4,3; 4,5; 
5,4;" +
+                       "5,6; 6,5; 6,7; 7,6; 7,8; 8,7; 8,9; 9,8; 9,0; 0,9";
+
+               TestUtils.compareGraph(graph, vertices, edges);
+       }
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               int vertexCount = 100;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
CycleGraph(env, vertexCount)
+                       .generate();
+
+               assertEquals(vertexCount, graph.numberOfVertices());
+               assertEquals(2 * vertexCount, graph.numberOfEdges());
+
+               long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+               long minOutDegree = 
graph.outDegrees().min(1).collect().get(0).f1;
+               long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+               long maxOutDegree = 
graph.outDegrees().max(1).collect().get(0).f1;
+
+               assertEquals(2, minInDegree);
+               assertEquals(2, minOutDegree);
+               assertEquals(2, maxInDegree);
+               assertEquals(2, maxOutDegree);
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
CycleGraph(env, 100)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
new file mode 100644
index 0000000..bc1ef77
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EmptyGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraph()
+                       throws Exception {
+               Graph<LongValue,NullValue,NullValue> graph = new 
EmptyGraph(env, 10)
+                       .generate();
+
+               String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+               String edges = null;
+
+               TestUtils.compareGraph(graph, vertices, edges);
+       }
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               int vertexCount = 100;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
EmptyGraph(env, vertexCount)
+                       .generate();
+
+               assertEquals(vertexCount, graph.numberOfVertices());
+               assertEquals(0, graph.numberOfEdges());
+
+               long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+               long maxOutDegree = 
graph.outDegrees().max(1).collect().get(0).f1;
+
+               assertEquals(0, maxInDegree);
+               assertEquals(0, maxOutDegree);
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
EmptyGraph(env, 100)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
new file mode 100644
index 0000000..f3fa7db
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GridGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraph()
+                       throws Exception {
+               Graph<LongValue, NullValue, NullValue> graph = new 
GridGraph(env)
+                       .addDimension(2, false)
+                       .addDimension(3, false)
+                       .generate();
+
+               // 0 1 2
+               // 3 4 5
+               String vertices = "0; 1; 2; 3; 4; 5";
+               String edges = "0,1; 0,3; 1,0; 1,2; 1,4; 2,1; 2,5; 3,0; 3,4; 
4,1;" +
+                       "4,3; 4,5; 5,2; 5,4";
+
+               TestUtils.compareGraph(graph, vertices, edges);
+       }
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               Graph<LongValue, NullValue, NullValue> graph = new 
GridGraph(env)
+                       .addDimension(2, true)
+                       .addDimension(3, true)
+                       .addDimension(5, true)
+                       .addDimension(7, true)
+                       .generate();
+
+               // Each vertex is the source of one edge in the first dimension 
of size 2,
+               // and the source of two edges in each dimension of size 
greater than 2.
+               assertEquals(2*3*5*7, graph.numberOfVertices());
+               assertEquals(7 * 2*3*5*7, graph.numberOfEdges());
+
+               long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+               long minOutDegree = 
graph.outDegrees().min(1).collect().get(0).f1;
+               long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+               long maxOutDegree = 
graph.outDegrees().max(1).collect().get(0).f1;
+
+               assertEquals(7, minInDegree);
+               assertEquals(7, minOutDegree);
+               assertEquals(7, maxInDegree);
+               assertEquals(7, maxOutDegree);
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+                       .addDimension(3, false)
+                       .addDimension(5, false)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
new file mode 100644
index 0000000..77eed89
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class HypercubeGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraph()
+                       throws Exception {
+               int dimensions = 3;
+
+               Graph<LongValue, NullValue, NullValue> graph = new 
HypercubeGraph(env, dimensions)
+                       .generate();
+
+               String vertices = "0; 1; 2; 3; 4; 5; 6; 7";
+               String edges = "0,1; 0,2; 0,4; 1,0; 1,3; 1,5; 2,0; 2,3; 2,6; 
3,1; 3,2; 3,7;" +
+                       "4,0; 4,5; 4,6; 5,1; 5,4; 5,7; 6,2; 6,4; 6,7; 7,3; 7,6; 
7,5";
+
+               TestUtils.compareGraph(graph, vertices, edges);
+       }
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               int dimensions = 10;
+
+               Graph<LongValue, NullValue, NullValue> graph = new 
HypercubeGraph(env, dimensions)
+                       .generate();
+
+               assertEquals(1 << dimensions, graph.numberOfVertices());
+               assertEquals(dimensions * (1 << dimensions), 
graph.numberOfEdges());
+
+               long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+               long minOutDegree = 
graph.outDegrees().min(1).collect().get(0).f1;
+               long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+               long maxOutDegree = 
graph.outDegrees().max(1).collect().get(0).f1;
+
+               assertEquals(dimensions, minInDegree);
+               assertEquals(dimensions, minOutDegree);
+               assertEquals(dimensions, maxInDegree);
+               assertEquals(dimensions, maxOutDegree);
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
HypercubeGraph(env, 4)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
new file mode 100644
index 0000000..b8a409f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PathGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraph()
+                       throws Exception {
+               Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 
10)
+                       .generate();
+
+               String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+               String edges = "0,1; 1,0; 1,2; 2,1; 2,3; 3,2; 3,4; 4,3; 4,5; 
5,4;" +
+                               "5,6; 6,5; 6,7; 7,6; 7,8; 8,7; 8,9; 9,8";
+
+               TestUtils.compareGraph(graph, vertices, edges);
+       }
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               int vertexCount = 100;
+
+               Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 
vertexCount)
+                       .generate();
+
+               assertEquals(vertexCount, graph.numberOfVertices());
+               assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
+
+               long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+               long minOutDegree = 
graph.outDegrees().min(1).collect().get(0).f1;
+               long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+               long maxOutDegree = 
graph.outDegrees().max(1).collect().get(0).f1;
+
+               assertEquals(1, minInDegree);
+               assertEquals(1, minOutDegree);
+               assertEquals(2, maxInDegree);
+               assertEquals(2, maxOutDegree);
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 
100)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
new file mode 100644
index 0000000..a06c63f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RMatGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               long vertexCount = 100;
+
+               long edgeCount = 1000;
+
+               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
+                       .generate();
+
+               assertTrue(vertexCount >= graph.numberOfVertices());
+               assertEquals(edgeCount, graph.numberOfEdges());
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
RMatGraph<>(env, rnd, 100, 1000)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
new file mode 100644
index 0000000..3877717
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SingletonEdgeGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraph()
+                       throws Exception {
+               int vertexPairCount = 5;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
SingletonEdgeGraph(env, vertexPairCount)
+                       .generate();
+
+               String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+               String edges = "0,1; 1,0; 2,3; 3,2; 4,5; 5,4; 6,7; 7,6; 8,9; 
9,8";
+
+               TestUtils.compareGraph(graph, vertices, edges);
+       }
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               int vertexPairCount = 10;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
SingletonEdgeGraph(env, vertexPairCount)
+                       .generate();
+
+               assertEquals(2 * vertexPairCount, graph.numberOfVertices());
+               assertEquals(2 * vertexPairCount, graph.numberOfEdges());
+
+               long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+               long minOutDegree = 
graph.outDegrees().min(1).collect().get(0).f1;
+               long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+               long maxOutDegree = 
graph.outDegrees().max(1).collect().get(0).f1;
+
+               assertEquals(1, minInDegree);
+               assertEquals(1, minOutDegree);
+               assertEquals(1, maxInDegree);
+               assertEquals(1, maxOutDegree);
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               Graph<LongValue,NullValue,NullValue> graph = new 
SingletonEdgeGraph(env, 10)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
new file mode 100644
index 0000000..2b090db
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StarGraphTest
+extends AbstractGraphTest {
+
+       @Test
+       public void testGraph()
+                       throws Exception {
+               int vertexCount = 10;
+
+               Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, 
vertexCount)
+                       .generate();
+
+               String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+               String edges = "0,1; 1,0; 0,2; 2,0; 0,3; 3,0; 0,4; 4,0; 0,5; 
5,0;" +
+                               "0,6; 6,0; 0,7; 7,0; 0,8; 8,0; 0,9; 9,0";
+
+               TestUtils.compareGraph(graph, vertices, edges);
+       }
+
+       @Test
+       public void testGraphMetrics()
+                       throws Exception {
+               int vertexCount = 100;
+
+               Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, 
vertexCount)
+                       .generate();
+
+               assertEquals(vertexCount, graph.numberOfVertices());
+               assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
+
+               long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+               long minOutDegree = 
graph.outDegrees().min(1).collect().get(0).f1;
+               long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+               long maxOutDegree = 
graph.outDegrees().max(1).collect().get(0).f1;
+
+               assertEquals(1, minInDegree);
+               assertEquals(1, minOutDegree);
+               assertEquals(vertexCount - 1, maxInDegree);
+               assertEquals(vertexCount - 1, maxOutDegree);
+       }
+
+       @Test
+       public void testParallelism()
+                       throws Exception {
+               int parallelism = 2;
+
+               Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, 
100)
+                       .setParallelism(parallelism)
+                       .generate();
+
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+               TestUtils.verifyParallelism(env, parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
new file mode 100644
index 0000000..3ea5a44
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.test.util.TestBaseUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public final class TestUtils {
+
+       /**
+        * Compare graph vertices and edges against expected values.
+        *
+        * @param graph graph under test
+        * @param expectedVertices vertex labels separated by semi-colons; 
whitespace is ignored
+        * @param expectedEdges edges of the form "source,target" separated by 
semi-colons; whitespace is ignored
+        * @param <K> the key type for edge and vertex identifiers
+        * @param <VV> the value type for vertices
+        * @param <EV> the value type for edges
+        * @throws Exception
+        */
+       public static <K,VV,EV> void compareGraph(Graph<K,VV,EV> graph, String 
expectedVertices, String expectedEdges)
+                       throws Exception {
+               // Vertices
+               if (expectedVertices != null) {
+                       List<String> resultVertices = new ArrayList<>();
+
+                       for (Vertex<K, VV> vertex : 
graph.getVertices().collect()) {
+                               resultVertices.add(vertex.f0.toString());
+                       }
+
+                       TestBaseUtils.compareResultAsText(resultVertices, 
expectedVertices.replaceAll("\\s","").replace(";", "\n"));
+               }
+
+               // Edges
+               if (expectedEdges != null) {
+                       List<String> resultEdges = new ArrayList<>();
+
+                       for (Edge<K, EV> edge : graph.getEdges().collect()) {
+                               resultEdges.add(edge.f0.toString() + "," + 
edge.f1.toString());
+                       }
+
+                       TestBaseUtils.compareResultAsText(resultEdges, 
expectedEdges.replaceAll("\\s","").replace(";", "\n"));
+               }
+       }
+
+       /**
+        * Verify operator parallelism.
+        *
+        * @param env the Flink execution environment.
+        * @param expectedParallelism expected operator parallelism
+        */
+       public static void verifyParallelism(ExecutionEnvironment env, int 
expectedParallelism) {
+               env.setParallelism(2 * expectedParallelism);
+
+               Optimizer compiler = new Optimizer(null, new 
DefaultCostEstimator(), new Configuration());
+               OptimizedPlan optimizedPlan = 
compiler.compile(env.createProgramPlan());
+
+               List<PlanNode> queue = new ArrayList<>();
+               queue.addAll(optimizedPlan.getDataSinks());
+
+               while (queue.size() > 0) {
+                       PlanNode node = queue.remove(queue.size() - 1);
+
+                       // Data sources may have parallelism of 1, so simply 
check that the node
+                       // parallelism has not been increased by setting the 
default parallelism
+                       assertTrue("Wrong parallelism for " + node.toString(), 
node.getParallelism() <= expectedParallelism);
+
+                       for (Channel channel : node.getInputs()) {
+                               queue.add(channel.getSource());
+                       }
+               }
+       }
+}

Reply via email to