[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-216280065 Implemented in 08e4bf9440a566c874c2b8e74ae2127ff264c672 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1900 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-216277509 Updated docs. Merging ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-216042040 Thanks for the updates @greghogan. Could you please also add the new `GraphAlgorithm`s in the docs? And maybe explain when to use those instead of the Graph methods? Otherwise, it's good to merge imo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215738750 I pushed the most recent changes. In order of things I care more about to things I care less about: The indentation is as intended. I find method chaining on a single line quite difficult to follow. I find that algorithms are much easier to read if separate operators are easily identifiable. Cramming `implements`, `extends`, and `throws` onto a single line is also difficult to parse, particularly for complicated nested parameters which may themselves extend interfaces. There are empty lines throughout the code. The remaining two look fine to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61583101 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { + output.setValue(offset + value.getValue()); + --- End diff -- Fixed in next push. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61582214 --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala --- @@ -407,6 +407,36 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Relabels vertices and edges using the given MapFunction. + * + * @param translator implements conversion from K to NEW + * @return relabeled graph + */ + def translateGraphLabels[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]): Graph[NEW,VV,EV] = { --- End diff -- Fixed in next push. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61579289 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552674 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { + + // Required configuration + private MapFunction translator; + + // Optional configuration + private int parallelism = PARALLELISM_UNKNOWN; + + /** +* Translate {@link Edge} values using the given {@link MapFunction}. +* +* @param translator implements conversion from {@code OLD} to {@code NEW} +*/ + public TranslateEdgeValues(MapFunction translator) { + Preconditions.checkNotNull(translator); + --- End diff -- empty line can be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552700 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { + + // Required configuration + private MapFunction translator; + + // Optional configuration + private int parallelism = PARALLELISM_UNKNOWN; + + /** +* Translate {@link Edge} values using the given {@link MapFunction}. +* +* @param translator implements conversion from {@code OLD} to {@code NEW} +*/ + public TranslateEdgeValues(MapFunction translator) { + Preconditions.checkNotNull(translator); + + this.translator = translator; + } + + /** +* Override the operator parallelism. +* +* @param parallelism operator parallelism +* @return this +*/ + public TranslateEdgeValues setParallelism(int parallelism) { + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + this.parallelism = parallelism; + --- End diff -- empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552592 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues; + +/** + * Translate {@link Edge} values using the given {@link MapFunction}. + * + * @param vertex label type + * @param vertex value type + * @param old edge value type + * @param new edge value type + */ +public class TranslateEdgeValues +implements GraphAlgorithm> { --- End diff -- can be moved to the previous line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552498 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel +
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552416 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel +
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61552437 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel +
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551910 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) + .name("Translate vertex labels"); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +*/ + @ForwardedFields("1") + private static class TranslateVertexLabel +
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551647 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { + Preconditions.checkNotNull(vertices); + Preconditions.checkNotNull(translator); + Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN, + "The parallelism must be greater than zero."); + + Class> vertexClass = (Class>)(Class) Vertex.class; + TypeInformation newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null); + TypeInformation vertexValueType = ((TupleTypeInfo>) vertices.getType()).getTypeAt(1); + + TupleTypeInfo> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType); + + return vertices + .map(new TranslateVertexLabel(translator)) + .returns(returnType) + .setParallelism(parallelism) --- End diff -- indention is off --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551561 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { + return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + @SuppressWarnings("unchecked") + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator, int parallelism) { --- End diff -- spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551440 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link MapFunction}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, MapFunction translator) { --- End diff -- can you leave one space after commas? i.e. `` instead of ``? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551292 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { + output.setValue(offset + value.getValue()); + --- End diff -- empty line can be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551265 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; + +/** + * Translate {@link LongValue} to {@link StringValue}. + */ +public class LongValueToStringValue +implements MapFunction { --- End diff -- same :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551238 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} to {@link IntValue}. + * + * Throws {@link RuntimeException} for integer overflow. + */ +public class LongValueToIntValue +implements MapFunction { + + private IntValue output = new IntValue(); + + @Override + public IntValue map(LongValue value) + throws Exception { --- End diff -- can be moved to the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551225 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} to {@link IntValue}. + * + * Throws {@link RuntimeException} for integer overflow. + */ +public class LongValueToIntValue +implements MapFunction { --- End diff -- can be moved to the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551172 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { + + private final long offset; + + private LongValue output = new LongValue(); + + /** +* Translate {@link LongValue} by adding a constant offset value. +* +* @param offset value to be added to each element +*/ + public LongValueAddOffset(long offset) { + this.offset = offset; + } + + @Override + public LongValue map(LongValue value) + throws Exception { --- End diff -- can be moved to the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61551105 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.translate; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.types.LongValue; + +/** + * Translate {@link LongValue} by adding a constant offset value. + */ +public class LongValueAddOffset +implements MapFunction { --- End diff -- can go in the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61550681 --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala --- @@ -407,6 +407,36 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) { } /** + * Relabels vertices and edges using the given MapFunction. + * + * @param translator implements conversion from K to NEW + * @return relabeled graph + */ + def translateGraphLabels[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]): Graph[NEW,VV,EV] = { --- End diff -- In the the Gelly code and docs, we refer to the vertex and edge keys as "Ids", not labels. I think we should rename the methods and javadocs to be consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215663664 My intention was to simplify the API, not have different ways to do the same thing. Anyway, if you think it's useful to have both the methods and the algorithms, I won't insist :) Can you please add a section in the Gelly guide describing how/when to use the translators? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215451225 One can now choose between: `graph.translateEdgeValues(new LongValueToStringValue()))` `graph.run(new TranslateEdgeValues<>(new LongValueToStringValue()))` The latter is slightly longer but customizable with parallelism, etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215444763 In Java 8 this is {{graph.run(new TranslateEdgeValues<>(new LongValueToStringValue()))}}, see "Target Types" from https://docs.oracle.com/javase/tutorial/java/generics/genTypeInference.html. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215440722 There's no reason we can't have both algorithms and methods on `Graph` which run the algorithm and ignore customization. I'll add this shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215432236 We can't translate vertex or edge IDs without modifying both as long as we want to return a `Graph` Setting parallelism is critical to scaling Flink to large datasets which doubles the function count. `graph.run(new TranslateEdgeValues(new LongValueToStringValue()))` is verbose due to the parameterization and we should definitely try to find a way to improve that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215428452 Hi @greghogan, I feel we're over-engineering something that should be way simpler... Translators are just a set of `MapFunction`s applied on vertex or edge ids and values. I think that the current API is way too complex for what it does, e.g. `graph.run(new TranslateEdgeValues(new LongValueToStringValue()))` seems too verbose to me for mapping an edge value. It could be as simple as `graph.mapEdges(new LongValueToStringValue())`. Also, I don't think we need 6 methods. We could either add 4 methods to `Graph`, i.e. `mapVertexIDs`, `mapVertexValues`, `mapEdgeIds`, `mapEdgeValues` or extend the `mapEdges` and `mapVertices` methods. In the latter case, we will have to provide specialized mappers as you note above, e.g. `MapVertexValueLongToString`, `MapEdgeValueLongToString`, etc., but these can internally wrap the same mapper. Please let me know your thought on this! Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215167381 The switch to `MapFunction` means that a user can use a `RichMapFunction` with open, close, accumulators, etc. Rather than add six new methods to `Graph` I implemented these with three `GraphAlgorithm` which use the Gellyish `Graph.run(GraphAlgorithm)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215040606 I think this is a very good idea, and we can wrap the user's MapFunction to function like a MapFunction. Now we can start discussing algorithms :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215012658 How about we add the translate methods to the `Graph` class and implement translators as `MapFunction`? This way they can be used even if don't have a graph but a dataset of vertices or edges. There is no requirement to have the mappers operate on the vertex and edge level. Does this sound like a good idea? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-214825734 By reuse I was referring to needing `MapVertexValueLongToString` and `MapEdgeValueLongToString` since the current `Graph` methods operate at the level of `Vertex` and `Edge`. And then to translate labels you need `MapVertexLabelLongToString` and `MapEdgeLabelLongToString`. In this PR we simply have a single `LongToString` which can be used to translate vertex labels, edge labels, vertex values, and edge values. I agree that `Translate` methods taking a `Graph` as input would be better as methods on `Graph`. And if we don't implement as `MapFunction` then we still need `Translate` to handle the `TypeInformation`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-214819709 Implementations of `MapFunction`s can also be reused :) What I was thinking was to provide the translator simply as map functions, e.g. like `Tuple2ToVertexMap`. We can add them to `org.apache.flink.graph.utils` or create a subpackage for that. Then we can add the `translate*` methods to `Graph`. If someone wants to use the provided translator on a dataset of vertices or edges, they can simply do this with a mapper. My concern is that we should try to be consistent with the existing Gelly API. e.g. something like `graph.translateIds(new LongValueToStringValue())` is Gelly-like, while `Translate.translateGraphLabels(graph, new LongValueToStringValue())` is not. Also, I think that this feature is simple enough to be implemented as a collection of map functions instead of a separate utility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-214782976 Hi @vasia, Comparing the APIs, `Translate` processes at the level of label or value whereas `mapVertices/Edges` process vertices and edges. What are use cases for the extra context (namely, the label) provided by `mapVertices/Edges`? Implementations of `Translator` can be reused across functions whereas the `MapFunction` provided to `mapVertices/Edges` are specific to that method. Would `mapLabels` require two new `MapFunction` or use an interface like `Translator`? `Translator` is wrapped by a specific `MapFunction` for each `Translate` function. Only one of the five current `Translate` functions operates on a `Graph`. `tranlateGraphLabels` could be moved to `Graph`, and there could be a `translateVertexValues` and `translateEdgeValues` also added to `Graph`. We don't always have a `Graph`, sometimes algorithms return a `DataSet` of vertices or edges, so having the static methods is helpful for these specialized `Vertex` and `Edge` operations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1900#discussion_r61083553 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/translate/Translate.java --- @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.translate; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; + +/** + * Methods for translation of the type or modification of the data of graph + * labels, vertex values, and edge values. + */ +public class Translate { + + // + // Translate graph labels + // + + /** +* Relabels {@link Vertex Vertices} and {@link Edge}s of a {@link Graph} using the given {@link Translator}. +* +* @param graph input graph +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old graph label type +* @param new graph label type +* @param vertex value type +* @param edge value type +* @return translated graph +*/ + public static Graph translateGraphLabels(Graph graph, Translator translator) { + return translateGraphLabels(graph, translator, ExecutionConfig.PARALLELISM_UNKNOWN); + } + + /** +* Relabels {@link Vertex Vertices} and {@link Edge}s of a {@link Graph} using the given {@link Translator}. +* +* @param graph input graph +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param parallelism operator parallelism +* @param old graph label type +* @param new graph label type +* @param vertex value type +* @param edge value type +* @return translated graph +*/ + public static Graph translateGraphLabels(Graph graph, Translator translator, int parallelism) { + // Vertices + DataSet> translatedVertices = translateVertexLabels(graph.getVertices(), translator, parallelism); + + // Edges + DataSet> translatedEdges = translateEdgeLabels(graph.getEdges(), translator, parallelism); + + // Graph + return Graph.fromDataSet(translatedVertices, translatedEdges, graph.getContext()); + } + + // + // Translate vertex labels + // + + /** +* Translate {@link Vertex} labels using the given {@link Translator}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param old vertex label type +* @param new vertex label type +* @param vertex value type +* @return translated vertices +*/ + public static DataSet> translateVertexLabels(DataSet> vertices, Translator translator) { + return translateVertexLabels(vertices, translator, ExecutionConfig.PARALLELISM_UNKNOWN); + } + + /** +* Translate {@link Vertex} labels using the given {@link Translator}. +* +* @param vertices input vertices +* @param translator implements conversion from {@code OLD} to {@code NEW} +* @param p
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-214733082 Hi @greghogan, I looked at the PR in more detail and I have some thoughts. First, why do we need a `Translate` class and `Translator` interface? Can't we simply add methods in the `Graph` class that wrap a `MapFunction` or receive it as an argument like in `mapVertices`? Second, translating values is already supported by `mapVertices` and `mapEdges`. Couldn't we simply extend these methods to also support changing the vertex key types? Let me know what you think! Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/1900 [FLINK-3771] [gelly] Methods for translating Graphs Methods for translation of the type or value of graph labels, vertex values, and edge values. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3771_methods_for_translating_graphs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1900.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1900 commit 0f9d0ed4d7ca8bcaa4651339cacf49ed1eec3667 Author: Greg Hogan Date: 2016-04-16T11:06:44Z [FLINK-3771] [gelly] Methods for translating Graphs Methods for translation of the type or value of graph labels, vertex values, and edge values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---