http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
index a5ea486..8c5ed86 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -28,8 +28,15 @@ import org.junit.runners.Parameterized;
 public class GraphMetricsITCase
 extends DriverBaseITCase {
 
-       public GraphMetricsITCase(TestExecutionMode mode) {
-               super(mode);
+       public GraphMetricsITCase(String idType, TestExecutionMode mode) {
+               super(idType, mode);
+       }
+
+       private String[] parameters(int scale, String order, String output) {
+               return new String[] {
+                       "--algorithm", "GraphMetrics", "--order", order,
+                       "--input", "RMatGraph", "--scale", 
Integer.toString(scale), "--type", idType, "--simplify", order,
+                       "--output", output};
        }
 
        @Test
@@ -43,58 +50,110 @@ extends DriverBaseITCase {
        }
 
        @Test
-       public void testWithDirectedRMatIntegerGraph() throws Exception {
+       public void testWithSmallDirectedRMatIntegerGraph() throws Exception {
                String expected = "\n" +
                        "Vertex metrics:\n" +
-                       "  vertex count: 902\n" +
-                       "  edge count: 12,009\n" +
-                       "  unidirectional edge count: 8,875\n" +
-                       "  bidirectional edge count: 1,567\n" +
-                       "  average degree: 13.314\n" +
-                       "  density: 0.01477663\n" +
-                       "  triplet count: 1,003,442\n" +
-                       "  maximum degree: 463\n" +
-                       "  maximum out degree: 334\n" +
-                       "  maximum in degree: 342\n" +
-                       "  maximum triplets: 106,953\n" +
+                       "  vertex count: 117\n" +
+                       "  edge count: 1,168\n" +
+                       "  unidirectional edge count: 686\n" +
+                       "  bidirectional edge count: 241\n" +
+                       "  average degree: 9.983\n" +
+                       "  density: 0.08605953\n" +
+                       "  triplet count: 29,286\n" +
+                       "  maximum degree: 91\n" +
+                       "  maximum out degree: 77\n" +
+                       "  maximum in degree: 68\n" +
+                       "  maximum triplets: 4,095\n" +
                        "\n" +
                        "Edge metrics:\n" +
-                       "  triangle triplet count: 107,817\n" +
-                       "  rectangle triplet count: 315,537\n" +
-                       "  maximum triangle triplets: 820\n" +
-                       "  maximum rectangle triplets: 3,822\n";
+                       "  triangle triplet count: 4,575\n" +
+                       "  rectangle triplet count: 11,756\n" +
+                       "  maximum triangle triplets: 153\n" +
+                       "  maximum rectangle triplets: 391\n";
+
+               expectedOutput(parameters(7, "directed", "hash"), expected);
+               expectedOutput(parameters(7, "directed", "print"), expected);
+       }
+
+       @Test
+       public void testWithLargeDirectedRMatIntegerGraph() throws Exception {
+               // skip 'byte' which cannot store vertex IDs for scale > 8
+               Assume.assumeFalse(idType.equals("byte") || 
idType.equals("nativeByte"));
+
+               // skip 'string' which does not compare numerically and 
generates a different triangle count
+               Assume.assumeFalse(idType.equals("string") || 
idType.equals("nativeString"));
 
-               String[] arguments = new String[]{"--algorithm", 
"GraphMetrics", "--order", "directed",
-                       "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
-                       "--output"};
+               String expected = "\n" +
+                       "Vertex metrics:\n" +
+                       "  vertex count: 3,349\n" +
+                       "  edge count: 53,368\n" +
+                       "  unidirectional edge count: 43,602\n" +
+                       "  bidirectional edge count: 4,883\n" +
+                       "  average degree: 15.936\n" +
+                       "  density: 0.00475971\n" +
+                       "  triplet count: 9,276,207\n" +
+                       "  maximum degree: 1,356\n" +
+                       "  maximum out degree: 921\n" +
+                       "  maximum in degree: 966\n" +
+                       "  maximum triplets: 918,690\n" +
+                       "\n" +
+                       "Edge metrics:\n" +
+                       "  triangle triplet count: 779,202\n" +
+                       "  rectangle triplet count: 2,506,371\n" +
+                       "  maximum triangle triplets: 3,160\n" +
+                       "  maximum rectangle triplets: 16,835\n";
 
-               expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
-               expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+               expectedOutput(parameters(12, "directed", "hash"), expected);
+               expectedOutput(parameters(12, "directed", "print"), expected);
        }
 
        @Test
-       public void testWithUndirectedRMatIntegerGraph() throws Exception {
+       public void testWithSmallUndirectedRMatIntegerGraph() throws Exception {
                String expected = "\n" +
                        "Vertex metrics:\n" +
-                       "  vertex count: 902\n" +
-                       "  edge count: 10,442\n" +
-                       "  average degree: 23.153\n" +
-                       "  density: 0.025697\n" +
-                       "  triplet count: 1,003,442\n" +
-                       "  maximum degree: 463\n" +
-                       "  maximum triplets: 106,953\n" +
+                       "  vertex count: 117\n" +
+                       "  edge count: 927\n" +
+                       "  average degree: 15.846\n" +
+                       "  density: 0.13660477\n" +
+                       "  triplet count: 29,286\n" +
+                       "  maximum degree: 91\n" +
+                       "  maximum triplets: 4,095\n" +
                        "\n" +
                        "Edge metrics:\n" +
-                       "  triangle triplet count: 107,817\n" +
-                       "  rectangle triplet count: 315,537\n" +
-                       "  maximum triangle triplets: 820\n" +
-                       "  maximum rectangle triplets: 3,822\n";
+                       "  triangle triplet count: 4,575\n" +
+                       "  rectangle triplet count: 11,756\n" +
+                       "  maximum triangle triplets: 153\n" +
+                       "  maximum rectangle triplets: 391\n";
+
+               expectedOutput(parameters(7, "undirected", "hash"), expected);
+               expectedOutput(parameters(7, "undirected", "print"), expected);
+       }
+
+       @Test
+       public void testWithLargelUndirectedRMatIntegerGraph() throws Exception 
{
+               // skip 'byte' which cannot store vertex IDs for scale > 8
+               Assume.assumeFalse(idType.equals("byte") || 
idType.equals("nativeByte"));
 
-               String[] arguments = new String[]{"--algorithm", 
"GraphMetrics", "--order", "undirected",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
-                               "--output"};
+               // skip 'string' which does not compare numerically and 
generates a different triangle count
+               Assume.assumeFalse(idType.equals("string") || 
idType.equals("nativeString"));
+
+               String expected = "\n" +
+                       "Vertex metrics:\n" +
+                       "  vertex count: 3,349\n" +
+                       "  edge count: 48,485\n" +
+                       "  average degree: 28.955\n" +
+                       "  density: 0.00864842\n" +
+                       "  triplet count: 9,276,207\n" +
+                       "  maximum degree: 1,356\n" +
+                       "  maximum triplets: 918,690\n" +
+                       "\n" +
+                       "Edge metrics:\n" +
+                       "  triangle triplet count: 779,202\n" +
+                       "  rectangle triplet count: 2,506,371\n" +
+                       "  maximum triangle triplets: 3,160\n" +
+                       "  maximum rectangle triplets: 16,835\n";
 
-               expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
-               expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+               expectedOutput(parameters(12, "undirected", "hash"), expected);
+               expectedOutput(parameters(12, "undirected", "print"), expected);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
index 5474d1b..282d3d5 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.drivers;
 
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -27,8 +28,15 @@ import org.junit.runners.Parameterized;
 public class HITSITCase
 extends DriverBaseITCase {
 
-       public HITSITCase(TestExecutionMode mode) {
-               super(mode);
+       public HITSITCase(String idType, TestExecutionMode mode) {
+               super(idType, mode);
+       }
+
+       private String[] parameters(int scale, String output) {
+               return new String[] {
+                       "--algorithm", "HITS",
+                       "--input", "RMatGraph", "--scale", 
Integer.toString(scale), "--type", idType, "--simplify", "directed",
+                       "--output", output};
        }
 
        @Test
@@ -42,11 +50,21 @@ extends DriverBaseITCase {
        }
 
        @Test
-       public void testPrintWithRMatIntegerGraph() throws Exception {
-               expectedCount(
-                       new String[]{"--algorithm", "HITS",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
-                               "--output", "print"},
-                       902);
+       public void testPrintWithSmallRMatGraph() throws Exception {
+               // skip 'char' since it is not printed as a number
+               Assume.assumeFalse(idType.equals("char") || 
idType.equals("nativeChar"));
+
+               expectedCount(parameters(8, "print"), 233);
+       }
+
+       @Test
+       public void testPrintWithLargeRMatGraph() throws Exception {
+               // skip 'char' since it is not printed as a number
+               Assume.assumeFalse(idType.equals("char") || 
idType.equals("nativeChar"));
+
+               // skip 'byte' which cannot store vertex IDs for scale > 8
+               Assume.assumeFalse(idType.equals("byte") || 
idType.equals("nativeByte"));
+
+               expectedCount(parameters(12, "print"), 3349);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
index 0632856..0391771 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
@@ -18,17 +18,29 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class JaccardIndexITCase
-extends DriverBaseITCase {
+extends CopyableValueDriverBaseITCase {
 
-       public JaccardIndexITCase(TestExecutionMode mode) {
-               super(mode);
+       public JaccardIndexITCase(String idType, TestExecutionMode mode) {
+               super(idType, mode);
+       }
+
+       private String[] parameters(int scale, String output, String... 
additionalParameters) {
+               String[] parameters = new String[] {
+                       "--algorithm", "JaccardIndex", "--mirror_results",
+                       "--input", "RMatGraph", "--scale", 
Integer.toString(scale), "--type", idType, "--simplify", "undirected",
+                       "--output", output};
+
+               return ArrayUtils.addAll(parameters, additionalParameters);
        }
 
        @Test
@@ -42,22 +54,67 @@ extends DriverBaseITCase {
        }
 
        @Test
-       public void testHashWithRMatIntegerGraph() throws Exception {
-               String expected = "\nChecksumHashCode 0x0001b188570b2572, count 
221628\n";
-
-               expectedOutput(
-                       new String[]{"--algorithm", "JaccardIndex",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
-                               "--output", "hash"},
-                       expected);
+       public void testHashWithSmallRMatGraph() throws Exception {
+               long checksum;
+               switch (idType) {
+                       case "byte":
+                       case "short":
+                       case "char":
+                       case "integer":
+                               checksum = 0x0000164757052eebL;
+                               break;
+
+                       case "long":
+                               checksum = 0x000016337a6a7270L;
+                               break;
+
+                       case "string":
+                               checksum = 0x00001622a522a290L;
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
type: " + idType);
+               }
+
+               expectedChecksum(parameters(7, "hash"), 11388, checksum);
        }
 
        @Test
-       public void testPrintWithRMatIntegerGraph() throws Exception {
-               expectedCount(
-                       new String[]{"--algorithm", "JaccardIndex",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
-                               "--output", "print"},
-                       221628);
+       public void testHashWithLargeRMatGraph() throws Exception {
+               // computation is too large for collection mode
+               Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+               long checksum;
+               switch (idType) {
+                       case "byte":
+                               return;
+
+                       case "short":
+                       case "char":
+                       case "integer":
+                               checksum = 0x0021ce158d811c4eL;
+                               break;
+
+                       case "long":
+                               checksum = 0x0021d20fb3904720L;
+                               break;
+
+                       case "string":
+                               checksum = 0x0021cd8fafec1524L;
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
type: " + idType);
+               }
+
+               expectedChecksum(parameters(12, "hash"), 4432058, checksum);
+       }
+
+       @Test
+       public void testPrintWithSmallRMatGraph() throws Exception {
+               // skip 'char' since it is not printed as a number
+               Assume.assumeFalse(idType.equals("char") || 
idType.equals("nativeChar"));
+
+               expectedOutputChecksum(parameters(7, "print"), new 
Checksum(11388, 0x0000163b17088256L));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
index d7301d0..4ca0a85 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.drivers;
 
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -27,8 +28,15 @@ import org.junit.runners.Parameterized;
 public class PageRankITCase
 extends DriverBaseITCase {
 
-       public PageRankITCase(TestExecutionMode mode) {
-               super(mode);
+       public PageRankITCase(String idType, TestExecutionMode mode) {
+               super(idType, mode);
+       }
+
+       private String[] parameters(int scale, String output) {
+               return new String[] {
+                       "--algorithm", "PageRank",
+                       "--input", "RMatGraph", "--scale", 
Integer.toString(scale), "--type", idType, "--simplify", "directed",
+                       "--output", output};
        }
 
        @Test
@@ -42,11 +50,21 @@ extends DriverBaseITCase {
        }
 
        @Test
-       public void testPrintWithRMatIntegerGraph() throws Exception {
-               expectedCount(
-                       new String[]{"--algorithm", "PageRank",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
-                               "--output", "print"},
-                       902);
+       public void testPrintWithSmallRMatGraph() throws Exception {
+               // skip 'char' since it is not printed as a number
+               Assume.assumeFalse(idType.equals("char") || 
idType.equals("nativeChar"));
+
+               expectedCount(parameters(8, "print"), 233);
+       }
+
+       @Test
+       public void testPrintWithLargeRMatGraph() throws Exception {
+               // skip 'char' since it is not printed as a number
+               Assume.assumeFalse(idType.equals("char") || 
idType.equals("nativeChar"));
+
+               // skip 'byte' which cannot store vertex IDs for scale > 8
+               Assume.assumeFalse(idType.equals("byte") || 
idType.equals("nativeByte"));
+
+               expectedCount(parameters(12, "print"), 3349);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
index 0d2897c..fabdae1 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
@@ -18,17 +18,33 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class TriangleListingITCase
-extends DriverBaseITCase {
+extends CopyableValueDriverBaseITCase {
 
-       public TriangleListingITCase(TestExecutionMode mode) {
-               super(mode);
+       public TriangleListingITCase(String idType, TestExecutionMode mode) {
+               super(idType, mode);
+       }
+
+       private String[] parameters(int scale, String order, String output) {
+               String[] parameters =  new String[] {
+                       "--algorithm", "TriangleListing", "--order", order,
+                       "--input", "RMatGraph", "--scale", 
Integer.toString(scale), "--type", idType, "--simplify", order,
+                       "--output", output};
+
+               if (output.equals("hash")) {
+                       return ArrayUtils.addAll(parameters, 
"--sort_triangle_vertices", "--triadic_census");
+               } else {
+                       return parameters;
+               }
        }
 
        @Test
@@ -42,66 +58,224 @@ extends DriverBaseITCase {
        }
 
        @Test
-       public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+       public void testHashWithSmallDirectedRMatGraph() throws Exception {
+               long checksum;
+               switch (idType) {
+                       case "byte":
+                       case "short":
+                       case "char":
+                       case "integer":
+                               checksum = 0x000000003d2f0a9aL;
+                               break;
+
+                       case "long":
+                               checksum = 0x000000016aba3720L;
+                               break;
+
+                       case "string":
+                               checksum = 0x0000005bfef84facL;
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
type: " + idType);
+               }
+
                String expected = "\n" +
-                       "ChecksumHashCode 0x0000001beffe6edd, count 75049\n" +
+                       new Checksum(3822, checksum) + "\n" +
                        "Triadic census:\n" +
-                       "  003: 113,435,893\n" +
-                       "  012: 6,632,528\n" +
-                       "  102: 983,535\n" +
-                       "  021d: 118,574\n" +
-                       "  021u: 118,566\n" +
-                       "  021c: 237,767\n" +
-                       "  111d: 129,773\n" +
-                       "  111u: 130,041\n" +
-                       "  030t: 16,981\n" +
-                       "  030c: 5,535\n" +
-                       "  201: 43,574\n" +
-                       "  120d: 7,449\n" +
-                       "  120u: 7,587\n" +
-                       "  120c: 15,178\n" +
-                       "  210: 17,368\n" +
-                       "  300: 4,951\n";
-
-               expectedOutput(
-                       new String[]{"--algorithm", "TriangleListing", 
"--order", "directed", "--sort_triangle_vertices", "--triadic_census",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
-                               "--output", "hash"},
-                       expected);
+                       "  003: 178,989\n" +
+                       "  012: 47,736\n" +
+                       "  102: 11,763\n" +
+                       "  021d: 2,258\n" +
+                       "  021u: 2,064\n" +
+                       "  021c: 4,426\n" +
+                       "  111d: 3,359\n" +
+                       "  111u: 3,747\n" +
+                       "  030t: 624\n" +
+                       "  030c: 220\n" +
+                       "  201: 1,966\n" +
+                       "  120d: 352\n" +
+                       "  120u: 394\n" +
+                       "  120c: 704\n" +
+                       "  210: 1,120\n" +
+                       "  300: 408\n";
+
+               expectedOutput(parameters(7, "directed", "hash"), expected);
        }
 
        @Test
-       public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
-               expectedCount(
-                       new String[]{"--algorithm", "TriangleListing", 
"--order", "directed",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "directed",
-                               "--output", "print"},
-                       75049);
+       public void testHashWithSmallUndirectedRMatGraph() throws Exception {
+               long checksum;
+               switch (idType) {
+                       case "byte":
+                       case "short":
+                       case "char":
+                       case "integer":
+                               checksum = 0x0000000001f92b0cL;
+                               break;
+
+                       case "long":
+                               checksum = 0x000000000bb355c6L;
+                               break;
+
+                       case "string":
+                               checksum = 0x00000002f7b5576aL;
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
type: " + idType);
+               }
+
+               String expected = "\n" +
+                       new Checksum(3822, checksum) + "\n" +
+                       "Triadic census:\n" +
+                       "  03: 178,989\n" +
+                       "  12: 59,499\n" +
+                       "  21: 17,820\n" +
+                       "  30: 3,822\n";
+
+               expectedOutput(parameters(7, "undirected", "hash"), expected);
        }
 
        @Test
-       public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+       public void testHashWithLargeDirectedRMatGraph() throws Exception {
+               // computation is too large for collection mode
+               Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+               long checksum;
+               switch (idType) {
+                       case "byte":
+                               return;
+
+                       case "short":
+                       case "char":
+                       case "integer":
+                               checksum = 0x00000248fef26209L;
+                               break;
+
+                       case "long":
+                               checksum = 0x000002dcdf0fbb1bL;
+                               break;
+
+                       case "string":
+                               checksum = 0x00035b760ab9da74L;
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
type: " + idType);
+               }
+
                String expected = "\n" +
-                       "ChecksumHashCode 0x00000000e6b3f32c, count 75049\n" +
+                       new Checksum(479818, checksum) + "\n" +
                        "Triadic census:\n" +
-                       "  03: 113,435,893\n" +
-                       "  12: 7,616,063\n" +
-                       "  21: 778,295\n" +
-                       "  30: 75,049\n";
-
-               expectedOutput(
-                       new String[]{"--algorithm", "TriangleListing", 
"--order", "undirected", "--sort_triangle_vertices", "--triadic_census",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
-                               "--output", "hash"},
-                       expected);
+                       "  003: 6,101,196,568\n" +
+                       "  012: 132,051,207\n" +
+                       "  102: 13,115,128\n" +
+                       "  021d: 1,330,423\n" +
+                       "  021u: 1,336,897\n" +
+                       "  021c: 2,669,285\n" +
+                       "  111d: 1,112,144\n" +
+                       "  111u: 1,097,452\n" +
+                       "  030t: 132,048\n" +
+                       "  030c: 44,127\n" +
+                       "  201: 290,552\n" +
+                       "  120d: 47,734\n" +
+                       "  120u: 47,780\n" +
+                       "  120c: 95,855\n" +
+                       "  210: 90,618\n" +
+                       "  300: 21,656\n";
+
+               expectedOutput(parameters(12, "directed", "hash"), expected);
        }
 
        @Test
-       public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
-               expectedCount(
-                       new String[]{"--algorithm", "TriangleListing", 
"--order", "undirected",
-                               "--input", "RMatGraph", "--type", "integer", 
"--simplify", "undirected",
-                               "--output", "print"},
-                       75049);
+       public void testHashWithLargeUndirectedRMatGraph() throws Exception {
+               // computation is too large for collection mode
+               Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+               long checksum;
+               switch (idType) {
+                       case "byte":
+                               return;
+
+                       case "short":
+                       case "char":
+                       case "integer":
+                               checksum = 0x00000012dee4bf2cL;
+                               break;
+
+                       case "long":
+                               checksum = 0x00000017a40efbdaL;
+                               break;
+
+                       case "string":
+                               checksum = 0x000159e8be3e370bL;
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
type: " + idType);
+               }
+
+               String expected = "\n" +
+                       new Checksum(479818, checksum) + "\n" +
+                       "Triadic census:\n" +
+                       "  03: 6,101,196,568\n" +
+                       "  12: 145,166,335\n" +
+                       "  21: 7,836,753\n" +
+                       "  30: 479,818\n";
+
+               expectedOutput(parameters(12, "undirected", "hash"), expected);
+       }
+
+
+       @Test
+       public void testPrintWithSmallDirectedRMatGraph() throws Exception {
+               // skip 'char' since it is not printed as a number
+               Assume.assumeFalse(idType.equals("char") || 
idType.equals("nativeChar"));
+
+               long checksum;
+               switch (idType) {
+                       case "byte":
+                       case "short":
+                       case "integer":
+                       case "long":
+                               checksum = 0x00000764227995aaL;
+                               break;
+
+                       case "string":
+                               checksum = 0x000007643d93c30aL;
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
type: " + idType);
+               }
+
+               expectedOutputChecksum(parameters(7, "directed", "print"), new 
Checksum(3822, checksum));
+       }
+
+
+       @Test
+       public void testPrintWithSmallUndirectedRMatGraph() throws Exception {
+               // skip 'char' since it is not printed as a number
+               Assume.assumeFalse(idType.equals("char") || 
idType.equals("nativeChar"));
+
+               long checksum;
+               switch (idType) {
+                       case "byte":
+                       case "short":
+                       case "integer":
+                       case "long":
+                               checksum = 0x0000077d1582e206L;
+                               break;
+
+                       case "string":
+                               checksum = 0x0000077a49cb5268L;
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
type: " + idType);
+               }
+
+               expectedOutputChecksum(parameters(7, "undirected", "print"), 
new Checksum(3822, checksum));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
new file mode 100644
index 0000000..da77b0d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToChar;
+import 
org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToCharValue;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToString;
+import 
org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedByte;
+import 
org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedByteValue;
+import 
org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedInt;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToLong;
+import 
org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedShort;
+import 
org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedShortValue;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.ShortValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GeneratedGraphTest {
+
+       private TranslateFunction<LongValue, ByteValue> byteValueTranslator = 
new LongValueToUnsignedByteValue();
+       private TranslateFunction<LongValue, Byte> byteTranslator = new 
LongValueToUnsignedByte();
+       private TranslateFunction<LongValue, ShortValue> shortValueTranslator = 
new LongValueToUnsignedShortValue();
+       private TranslateFunction<LongValue, Short> shortTranslator = new 
LongValueToUnsignedShort();
+       private TranslateFunction<LongValue, CharValue> charValueTranslator = 
new LongValueToCharValue();
+       private TranslateFunction<LongValue, Character> charTranslator = new 
LongValueToChar();
+       private TranslateFunction<LongValue, Integer> intTranslator = new 
LongValueToUnsignedInt();
+       private TranslateFunction<LongValue, Long> longTranslator = new 
LongValueToLong();
+       private TranslateFunction<LongValue, String> stringTranslator = new 
LongValueToString();
+
+       private ByteValue byteValue = new ByteValue();
+       private ShortValue shortValue = new ShortValue();
+       private CharValue charValue = new CharValue();
+
+       // ByteValue
+
+       @Test
+       public void testByteValueTranslation() throws Exception {
+               assertEquals(new ByteValue((byte) 0), 
byteValueTranslator.translate(new LongValue(0L), byteValue));
+               assertEquals(new ByteValue(Byte.MIN_VALUE), 
byteValueTranslator.translate(new LongValue((long) Byte.MAX_VALUE + 1), 
byteValue));
+               assertEquals(new ByteValue((byte) -1), 
byteValueTranslator.translate(new LongValue((1L << 8) - 1), byteValue));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testByteValueTranslationUpperOutOfRange() throws Exception {
+               byteValueTranslator.translate(new LongValue(1L << 8), 
byteValue);
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testByteValueTranslationLowerOutOfRange() throws Exception {
+               byteValueTranslator.translate(new LongValue(-1), byteValue);
+       }
+
+       // Byte
+
+       @Test
+       public void testByteTranslation() throws Exception {
+               assertEquals(Byte.valueOf((byte) 0), 
byteTranslator.translate(new LongValue(0L), null));
+               assertEquals(Byte.valueOf(Byte.MIN_VALUE), 
byteTranslator.translate(new LongValue((long) Byte.MAX_VALUE + 1), null));
+               assertEquals(Byte.valueOf((byte) -1), 
byteTranslator.translate(new LongValue((1L << 8) - 1), null));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testByteTranslationUpperOutOfRange() throws Exception {
+               byteTranslator.translate(new LongValue(1L << 8), null);
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testByteTranslationLowerOutOfRange() throws Exception {
+               byteTranslator.translate(new LongValue(-1), null);
+       }
+
+       // ShortValue
+
+       @Test
+       public void testShortValueTranslation() throws Exception {
+               assertEquals(new ShortValue((short) 0), 
shortValueTranslator.translate(new LongValue(0L), shortValue));
+               assertEquals(new ShortValue(Short.MIN_VALUE), 
shortValueTranslator.translate(new LongValue((long) Short.MAX_VALUE + 1), 
shortValue));
+               assertEquals(new ShortValue((short) -1), 
shortValueTranslator.translate(new LongValue((1L << 16) - 1), shortValue));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testShortValueTranslationUpperOutOfRange() throws Exception 
{
+               shortValueTranslator.translate(new LongValue(1L << 16), 
shortValue);
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testShortValueTranslationLowerOutOfRange() throws Exception 
{
+               shortValueTranslator.translate(new LongValue(-1), shortValue);
+       }
+
+       // Short
+
+       @Test
+       public void testShortTranslation() throws Exception {
+               assertEquals(Short.valueOf((short) 0), 
shortTranslator.translate(new LongValue(0L), null));
+               assertEquals(Short.valueOf(Short.MIN_VALUE), 
shortTranslator.translate(new LongValue((long) Short.MAX_VALUE + 1), null));
+               assertEquals(Short.valueOf((short) -1), 
shortTranslator.translate(new LongValue((1L << 16) - 1), null));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testShortTranslationUpperOutOfRange() throws Exception {
+               shortTranslator.translate(new LongValue(1L << 16), null);
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testShortTranslationLowerOutOfRange() throws Exception {
+               shortTranslator.translate(new LongValue(-1), null);
+       }
+
+       // CharValue
+
+       @Test
+       public void testCharValueTranslation() throws Exception {
+               assertEquals(new CharValue((char) 0), 
charValueTranslator.translate(new LongValue(0L), charValue));
+               assertEquals(new CharValue(Character.MAX_VALUE), 
charValueTranslator.translate(new LongValue((long) Character.MAX_VALUE), 
charValue));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testCharValueTranslationUpperOutOfRange() throws Exception {
+               charValueTranslator.translate(new LongValue(1L << 16), 
charValue);
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testCharValueTranslationLowerOutOfRange() throws Exception {
+               charValueTranslator.translate(new LongValue(-1), charValue);
+       }
+
+       // Character
+
+       @Test
+       public void testCharacterTranslation() throws Exception {
+               assertEquals(Character.valueOf((char) 0), 
charTranslator.translate(new LongValue(0L), null));
+               assertEquals(Character.valueOf(Character.MAX_VALUE), 
charTranslator.translate(new LongValue((long) Character.MAX_VALUE), null));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testCharacterTranslationUpperOutOfRange() throws Exception {
+               charTranslator.translate(new LongValue(1L << 16), null);
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testCharacterTranslationLowerOutOfRange() throws Exception {
+               charTranslator.translate(new LongValue(-1), null);
+       }
+
+       // Integer
+
+       @Test
+       public void testIntegerTranslation() throws Exception {
+               assertEquals(Integer.valueOf(0), intTranslator.translate(new 
LongValue(0L), null));
+               assertEquals(Integer.valueOf(Integer.MIN_VALUE), 
intTranslator.translate(new LongValue((long) Integer.MAX_VALUE + 1), null));
+               assertEquals(Integer.valueOf(-1), intTranslator.translate(new 
LongValue((1L << 32) - 1), null));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testIntegerTranslationUpperOutOfRange() throws Exception {
+               intTranslator.translate(new LongValue(1L << 32), null);
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testIntegerTranslationLowerOutOfRange() throws Exception {
+               intTranslator.translate(new LongValue(-1), null);
+       }
+
+       // Long
+
+       @Test
+       public void testLongTranslation() throws Exception {
+               assertEquals(Long.valueOf(0L), longTranslator.translate(new 
LongValue(0L), null));
+               assertEquals(Long.valueOf(Long.MIN_VALUE), 
longTranslator.translate(new LongValue(Long.MIN_VALUE), null));
+               assertEquals(Long.valueOf(Long.MAX_VALUE), 
longTranslator.translate(new LongValue(Long.MAX_VALUE), null));
+       }
+
+       // String
+
+       @Test
+       public void testStringTranslation() throws Exception {
+               assertEquals("0", stringTranslator.translate(new LongValue(0L), 
null));
+               assertEquals("-9223372036854775808", 
stringTranslator.translate(new LongValue(Long.MIN_VALUE), null));
+               assertEquals("9223372036854775807", 
stringTranslator.translate(new LongValue(Long.MAX_VALUE), null));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
index 038a2e4..30a74df 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.MathUtils;
 public class LongValueToSignedIntValue
 implements TranslateFunction<LongValue, IntValue> {
 
+       public static final long MAX_VERTEX_COUNT = 1L << 32;
+
        @Override
        public IntValue translate(LongValue value, IntValue reuse)
                        throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
index 8fe665c..741bd62 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
@@ -30,6 +30,8 @@ import org.apache.flink.types.LongValue;
 public class LongValueToUnsignedIntValue
 implements TranslateFunction<LongValue, IntValue> {
 
+       public static final long MAX_VERTEX_COUNT = 1L << 32;
+
        @Override
        public IntValue translate(LongValue value, IntValue reuse)
                        throws Exception {
@@ -39,10 +41,10 @@ implements TranslateFunction<LongValue, IntValue> {
 
                long l = value.getValue();
 
-               if (l < 0 || l >= (1L << 32)) {
+               if (l < 0 || l >= MAX_VERTEX_COUNT) {
                        throw new IllegalArgumentException("Cannot cast long 
value " + value + " to integer.");
                } else {
-                       reuse.setValue((int)(l & 0xffffffffL));
+                       reuse.setValue((int) (l & (MAX_VERTEX_COUNT - 1)));
                }
 
                return reuse;

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index dfa7eb2..84cb791 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -80,17 +80,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @ForwardedFields("*->f0")
-       public class LinkVertexToAll
+       private static class LinkVertexToAll
        implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
-
                private final long vertexCount;
 
                private LongValue target = new LongValue();
 
                private Edge<LongValue, NullValue> edge = new Edge<>(null, 
target, NullValue.getInstance());
 
-               public LinkVertexToAll(long vertex_count) {
-                       this.vertexCount = vertex_count;
+               public LinkVertexToAll(long vertexCount) {
+                       this.vertexCount = vertexCount;
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 0ca804e..16a9ab9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -107,7 +107,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @ForwardedFields("*->f0")
-       public class LinkVertexToNeighbors
+       private static class LinkVertexToNeighbors
        implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
                private long vertexCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 95a4f85..0fa4127 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -166,7 +166,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
                return Graph.fromDataSet(vertices, edges, env);
        }
 
-       private static final class GenerateEdges<T extends RandomGenerator>
+       private static class GenerateEdges<T extends RandomGenerator>
        implements FlatMapFunction<BlockInfo<T>, Edge<LongValue, NullValue>> {
 
                // Configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index 6c7c433..316d749 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -80,7 +80,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @ForwardedFields("*->f0")
-       public class LinkVertexToCenter
+       private static class LinkVertexToCenter
        implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
                private LongValue center = new LongValue(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 4857add..ba1ab21 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -60,8 +60,6 @@ import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * <p>
  * http://www.cs.cornell.edu/home/kleinber/auth.pdf
  *
- * http://www.cs.cornell.edu/home/kleinber/auth.pdf
- *
  * @param <K> graph ID type
  * @param <VV> vertex value type
  * @param <EV> edge value type

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index bc3cb86..b3e69f1 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -80,6 +80,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
        private int maximumScoreDenominator = 1;
 
+       private boolean mirrorResults;
+
        private int littleParallelism = PARALLELISM_DEFAULT;
 
        /**
@@ -141,6 +143,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
        }
 
        /**
+        * By default only one result is output for each pair of vertices. When
+        * mirroring a second result with the vertex order flipped is output for
+        * each pair of vertices.
+        *
+        * @param mirrorResults whether output results should be mirrored
+        * @return this
+        */
+       public JaccardIndex<K, VV, EV> setMirrorResults(boolean mirrorResults) {
+               this.mirrorResults = mirrorResults;
+
+               return this;
+       }
+
+       /**
         * Override the parallelism of operators processing small amounts of 
data.
         *
         * @param littleParallelism operator parallelism
@@ -176,7 +192,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        minimumScoreNumerator != rhs.minimumScoreNumerator ||
                        minimumScoreDenominator != rhs.minimumScoreDenominator 
||
                        maximumScoreNumerator != rhs.maximumScoreNumerator ||
-                       maximumScoreDenominator != rhs.maximumScoreDenominator) 
{
+                       maximumScoreDenominator != rhs.maximumScoreDenominator 
||
+                       mirrorResults != rhs.mirrorResults) {
                        return false;
                }
 
@@ -230,12 +247,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                                .name("Generate group pairs");
 
                // t, u, intersection, union
-               return twoPaths
+               DataSet<Result<K>> scores = twoPaths
                        .groupBy(0, 1)
                        .reduceGroup(new ComputeScores<K>(unboundedScores,
                                        minimumScoreNumerator, 
minimumScoreDenominator,
                                        maximumScoreNumerator, 
maximumScoreDenominator))
                                .name("Compute scores");
+
+               if (mirrorResults) {
+                       scores = scores
+                               .flatMap(new MirrorResult<K, Result<K>>())
+                                       .name("Mirror results");
+               }
+
+               return scores;
        }
 
        /**
@@ -451,6 +476,27 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
        }
 
        /**
+        * Output each input and a second result with the vertex order flipped.
+        *
+        * @param <T> ID type
+        * @param <RT> result type
+        */
+       private static class MirrorResult<T, RT extends BinaryResult<T>>
+       implements FlatMapFunction<RT, RT> {
+               @Override
+               public void flatMap(RT value, Collector<RT> out)
+                               throws Exception {
+                       out.collect(value);
+
+                       T tmp = value.getVertexId0();
+                       value.setVertexId0(value.getVertexId1());
+                       value.setVertexId1(tmp);
+
+                       out.collect(value);
+               }
+       }
+
+       /**
         * Wraps the vertex type to encapsulate results from the jaccard index 
algorithm.
         *
         * @param <T> ID type

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 1811038..469a23f 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -60,7 +60,7 @@ public class AsmTestBase {
                env = ExecutionEnvironment.createCollectionsEnvironment();
                env.getConfig().enableObjectReuse();
 
-               // the "fish" graph
+               // a "fish" graph
                Object[][] edges = new Object[][]{
                        new Object[]{0, 1},
                        new Object[]{0, 2},

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
index 976cc34..f730adf 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
@@ -40,11 +40,11 @@ public class LongValueToSignedIntValueTest {
 
        @Test(expected=IllegalArgumentException.class)
        public void testUpperOutOfRange() throws Exception {
-               assertEquals(new IntValue(), translator.translate(new 
LongValue((long)Integer.MAX_VALUE + 1), reuse));
+               translator.translate(new LongValue((long)Integer.MAX_VALUE + 
1), reuse);
        }
 
        @Test(expected=IllegalArgumentException.class)
        public void testLowerOutOfRange() throws Exception {
-               assertEquals(new IntValue(), translator.translate(new 
LongValue((long)Integer.MIN_VALUE - 1), reuse));
+               translator.translate(new LongValue((long)Integer.MIN_VALUE - 
1), reuse);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
index 6a2ae83..ca70162 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
@@ -34,17 +34,17 @@ public class LongValueToUnsignedIntValueTest {
        @Test
        public void testTranslation() throws Exception {
                assertEquals(new IntValue(0), translator.translate(new 
LongValue(0L), reuse));
-               assertEquals(new IntValue(Integer.MIN_VALUE), 
translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+               assertEquals(new IntValue(Integer.MIN_VALUE), 
translator.translate(new LongValue((long) Integer.MAX_VALUE + 1), reuse));
                assertEquals(new IntValue(-1), translator.translate(new 
LongValue((1L << 32) - 1), reuse));
        }
 
        @Test(expected=IllegalArgumentException.class)
        public void testUpperOutOfRange() throws Exception {
-               assertEquals(new IntValue(), translator.translate(new 
LongValue(1L << 32), reuse));
+               translator.translate(new LongValue(1L << 32), reuse);
        }
 
        @Test(expected=IllegalArgumentException.class)
        public void testLowerOutOfRange() throws Exception {
-               assertEquals(new IntValue(), translator.translate(new 
LongValue(-1), reuse));
+               translator.translate(new LongValue(-1), reuse);
        }
 }

Reply via email to