[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263795#comment-15263795 ]
ASF GitHub Bot commented on FLINK-3771: --------------------------------------- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552416 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // -------------------------------------------------------------------------------------------- + // Translate vertex labels + // -------------------------------------------------------------------------------------------- + + /** + * Translate {@link Vertex} labels using the given {@link MapFunction}. + * + * @param vertices input vertices + * @param translator implements conversion from {@code OLD} to {@code NEW} + * @param <OLD> old vertex label type + * @param <NEW> new vertex label type + * @param <VV> vertex value type + * @return translated vertices + */ + public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** + * Translate {@link Vertex} labels using the given {@link MapFunction}. + * + * @param vertices input vertices + * @param translator implements conversion from {@code OLD} to {@code NEW} + * @param parallelism operator parallelism + * @param <OLD> old vertex label type + * @param <NEW> new vertex label type + * @param <VV> vertex value type + * @return translated vertices + */ + @SuppressWarnings("unchecked") + public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class<Vertex<NEW,VV>> vertexClass = (Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class; + TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo<Vertex<NEW,VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel<OLD,NEW,VV>(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** + * Translate {@link Vertex} labels using the given {@link MapFunction}. + * + * @param <OLD> old vertex label type + * @param <NEW> new vertex label type + * @param <VV> vertex value type + */ + @ForwardedFields("1") + private static class TranslateVertexLabel<OLD,NEW,VV> + extends WrappingFunction<MapFunction<OLD,NEW>> + implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> { + private Vertex<NEW,VV> vertex = new Vertex<>(); + + public TranslateVertexLabel(MapFunction<OLD,NEW> translator) { --- End diff -- make the constructor private? > Methods for translating Graphs > ------------------------------ > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly > Affects Versions: 1.1.0 > Reporter: Greg Hogan > Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)