[hotfix] [gelly] Don't print Null edge values

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22475023
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22475023
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22475023

Branch: refs/heads/master
Commit: 224750234758d4cc2234e98fd9d26ae794e0d669
Parents: 568e642
Author: Greg Hogan <c...@greghogan.com>
Authored: Tue Apr 18 09:28:31 2017 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Tue Apr 18 09:28:31 2017 -0400

----------------------------------------------------------------------
 .../apache/flink/graph/drivers/EdgeList.java    | 53 +++++++++++++++++---
 .../flink/graph/utils/EdgeToTuple2Map.java      | 45 +++++++++++++++++
 2 files changed, 91 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22475023/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
index 85f32c3..524e70f 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
@@ -28,11 +32,17 @@ import org.apache.flink.graph.drivers.output.CSV;
 import org.apache.flink.graph.drivers.output.Hash;
 import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.utils.EdgeToTuple2Map;
+import org.apache.flink.types.NullValue;
 
 import java.util.List;
 
 /**
- * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}s.
+ * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
  */
 public class EdgeList<K, VV, EV>
 extends ParameterizedBase
@@ -77,16 +87,45 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
                // Refactored due to openjdk7 compile error: 
https://travis-ci.org/greghogan/flink/builds/200487761
                List<Edge<K, EV>> records = 
collector.run(edges).execute(executionName);
 
-               for (Edge<K, EV> result : records) {
-                       System.out.println(result);
+               if (hasNullValueEdges(edges)) {
+                       for (Edge<K, EV> result : records) {
+                               System.out.println("(" + result.f0 + "," + 
result.f1 + ")");
+                       }
+               } else {
+                       for (Edge<K, EV> result : records) {
+                               System.out.println(result);
+                       }
                }
-
        }
 
        @Override
        public void writeCSV(String filename, String lineDelimiter, String 
fieldDelimiter) {
-               edges
-                       .writeAsCsv(filename, lineDelimiter, fieldDelimiter)
-                               .name("CSV: " + filename);
+               if (hasNullValueEdges(edges)) {
+                       edges
+                               .map(new EdgeToTuple2Map<K, EV>())
+                                       .name("Edge to Tuple2")
+                               .writeAsCsv(filename, lineDelimiter, 
fieldDelimiter)
+                                       .name("CSV: " + filename);
+               } else {
+                       edges
+                               .writeAsCsv(filename, lineDelimiter, 
fieldDelimiter)
+                                       .name("CSV: " + filename);
+               }
+       }
+
+       /**
+        * Check whether the edge type of the {@link DataSet} is {@link 
NullValue}.
+        *
+        * @param edges data set for introspection
+        * @param <T> graph ID type
+        * @param <ET> edge value type
+        * @return whether the edge type of the {@link DataSet} is {@link 
NullValue}
+        */
+       private static <T, ET> boolean hasNullValueEdges(DataSet<Edge<T, ET>> 
edges) {
+               TypeInformation<?> genericTypeInfo = edges.getType();
+               @SuppressWarnings("unchecked")
+               TupleTypeInfo<Tuple3<T, T, ET>> tupleTypeInfo = 
(TupleTypeInfo<Tuple3<T, T, ET>>) genericTypeInfo;
+
+               return 
tupleTypeInfo.getTypeAt(2).equals(ValueTypeInfo.NULL_VALUE_TYPE_INFO);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22475023/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java
new file mode 100644
index 0000000..1e500ea
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+
+/**
+ * Create a Tuple2 DataSet from the vertices of an Edge DataSet
+ *
+ * @param <K> edge ID type
+ * @param <EV> edge value type
+ */
+@ForwardedFields("f0; f1")
+public class EdgeToTuple2Map<K, EV> implements MapFunction<Edge<K, EV>, 
Tuple2<K, K>> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Tuple2<K, K> output = new Tuple2<>();
+
+       @Override
+       public Tuple2<K, K> map(Edge<K, EV> edge) {
+               output.f0 = edge.f0;
+               output.f1 = edge.f1;
+               return output;
+       }
+}

Reply via email to