[hotfix] [gelly] Don't print Null edge values
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22475023 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22475023 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22475023 Branch: refs/heads/master Commit: 224750234758d4cc2234e98fd9d26ae794e0d669 Parents: 568e642 Author: Greg Hogan <c...@greghogan.com> Authored: Tue Apr 18 09:28:31 2017 -0400 Committer: Greg Hogan <c...@greghogan.com> Committed: Tue Apr 18 09:28:31 2017 -0400 ---------------------------------------------------------------------- .../apache/flink/graph/drivers/EdgeList.java | 53 +++++++++++++++++--- .../flink/graph/utils/EdgeToTuple2Map.java | 45 +++++++++++++++++ 2 files changed, 91 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/22475023/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java index 85f32c3..524e70f 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java @@ -18,7 +18,11 @@ package org.apache.flink.graph.drivers; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.dataset.ChecksumHashCode; @@ -28,11 +32,17 @@ import org.apache.flink.graph.drivers.output.CSV; import org.apache.flink.graph.drivers.output.Hash; import org.apache.flink.graph.drivers.output.Print; import org.apache.flink.graph.drivers.parameter.ParameterizedBase; +import org.apache.flink.graph.utils.EdgeToTuple2Map; +import org.apache.flink.types.NullValue; import java.util.List; /** - * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}s. + * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}. + * + * @param <K> graph ID type + * @param <VV> vertex value type + * @param <EV> edge value type */ public class EdgeList<K, VV, EV> extends ParameterizedBase @@ -77,16 +87,45 @@ implements Driver<K, VV, EV>, CSV, Hash, Print { // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761 List<Edge<K, EV>> records = collector.run(edges).execute(executionName); - for (Edge<K, EV> result : records) { - System.out.println(result); + if (hasNullValueEdges(edges)) { + for (Edge<K, EV> result : records) { + System.out.println("(" + result.f0 + "," + result.f1 + ")"); + } + } else { + for (Edge<K, EV> result : records) { + System.out.println(result); + } } - } @Override public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) { - edges - .writeAsCsv(filename, lineDelimiter, fieldDelimiter) - .name("CSV: " + filename); + if (hasNullValueEdges(edges)) { + edges + .map(new EdgeToTuple2Map<K, EV>()) + .name("Edge to Tuple2") + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } else { + edges + .writeAsCsv(filename, lineDelimiter, fieldDelimiter) + .name("CSV: " + filename); + } + } + + /** + * Check whether the edge type of the {@link DataSet} is {@link NullValue}. + * + * @param edges data set for introspection + * @param <T> graph ID type + * @param <ET> edge value type + * @return whether the edge type of the {@link DataSet} is {@link NullValue} + */ + private static <T, ET> boolean hasNullValueEdges(DataSet<Edge<T, ET>> edges) { + TypeInformation<?> genericTypeInfo = edges.getType(); + @SuppressWarnings("unchecked") + TupleTypeInfo<Tuple3<T, T, ET>> tupleTypeInfo = (TupleTypeInfo<Tuple3<T, T, ET>>) genericTypeInfo; + + return tupleTypeInfo.getTypeAt(2).equals(ValueTypeInfo.NULL_VALUE_TYPE_INFO); } } http://git-wip-us.apache.org/repos/asf/flink/blob/22475023/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java new file mode 100644 index 0000000..1e500ea --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple2Map.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; + +/** + * Create a Tuple2 DataSet from the vertices of an Edge DataSet + * + * @param <K> edge ID type + * @param <EV> edge value type + */ +@ForwardedFields("f0; f1") +public class EdgeToTuple2Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple2<K, K>> { + + private static final long serialVersionUID = 1L; + + private Tuple2<K, K> output = new Tuple2<>(); + + @Override + public Tuple2<K, K> map(Edge<K, EV> edge) { + output.f0 = edge.f0; + output.f1 = edge.f1; + return output; + } +}