Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/1980#discussion_r63895157
--- Diff:
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are
common).
+ * <br/>
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * <br/>
+ * The input graph must be a simple, undirected graph containing no
duplicate
+ * edges or self-loops.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+
+ public static final int DEFAULT_GROUP_SIZE = 64;
+
+ // Optional configuration
+ private int groupSize = DEFAULT_GROUP_SIZE;
+
+ private boolean unboundedScores = true;
+
+ private int minimumScoreNumerator = 0;
+
+ private int minimumScoreDenominator = 1;
+
+ private int maximumScoreNumerator = 1;
+
+ private int maximumScoreDenominator = 0;
+
+ private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+ /**
+ * Override the default group size for the quadratic expansion of
neighbor
+ * pairs. Small groups generate more data whereas large groups
distribute
+ * computation less evenly among tasks.
+ *
+ * @param groupSize the group size for the quadratic expansion of
neighbor pairs
+ * @return this
+ */
+ public JaccardIndex<K, VV, EV> setGroupSize(int groupSize) {
+ Preconditions.checkArgument(groupSize > 0, "Group size must be
greater than zero");
+
+ this.groupSize = groupSize;
+
+ return this;
+ }
+
+ /**
+ * Filter out Jaccard Index scores less than the given minimum fraction.
+ *
+ * @param numerator numerator of the minimum score
+ * @param denominator denominator of the minimum score
+ * @return this
+ * @see #setMaximumScore(int, int)
+ */
+ public JaccardIndex<K, VV, EV> setMinimumScore(int numerator, int
denominator) {
+ Preconditions.checkArgument(numerator >= 0, "Minimum score
numerator must be non-negative");
+ Preconditions.checkArgument(denominator > 0, "Minimum score
denominator must be greater than zero");
+ Preconditions.checkArgument(numerator <= denominator, "Minimum
score fraction must be less than or equal to one");
+
+ this.unboundedScores = false;
+ this.minimumScoreNumerator = numerator;
+ this.minimumScoreDenominator = denominator;
+
+ return this;
+ }
+
+ /**
+ * Filter out Jaccard Index scores greater than or equal to the given
maximum fraction.
+ *
+ * @param numerator numerator of the maximum score
+ * @param denominator denominator of the maximum score
+ * @return this
+ * @see #setMinimumScore(int, int)
+ */
+ public JaccardIndex<K, VV, EV> setMaximumScore(int numerator, int
denominator) {
+ Preconditions.checkArgument(numerator >= 0, "Maximum score
numerator must be non-negative");
+ Preconditions.checkArgument(denominator > 0, "Maximum score
denominator must be greater than zero");
+ Preconditions.checkArgument(numerator <= denominator, "Maximum
score fraction must be less than or equal to one");
+
+ this.unboundedScores = false;
+ this.maximumScoreNumerator = numerator;
+ this.maximumScoreDenominator = denominator;
+
+ return this;
+ }
+
+ /**
+ * Override the parallelism of operators processing small amounts of
data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public JaccardIndex<K, VV, EV> setLittleParallelism(int
littleParallelism) {
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ /*
+ * Implementation notes:
+ *
+ * The requirement that "K extends CopyableValue<K>" can be removed when
+ * Flink has a self-join which performs the skew distribution handled
by
+ * GenerateGroupSpans / GenerateGroups / GenerateGroupPairs.
+ */
+
+ @Override
+ public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+ throws Exception {
+ // s, t, d(t)
+ DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input
+ .run(new EdgeTargetDegree<K, VV, EV>()
+ .setParallelism(littleParallelism));
+
+ // group span, s, t, d(t)
+ DataSet<Tuple4<IntValue, K, K, IntValue>> groupSpans =
neighborDegree
+ .groupBy(0)
+ .sortGroup(1, Order.ASCENDING)
+ .reduceGroup(new GenerateGroupSpans<K, EV>(groupSize))
+ .setParallelism(littleParallelism)
+ .name("Generate group spans");
+
+ // group, s, t, d(t)
+ DataSet<Tuple4<IntValue, K, K, IntValue>> groups = groupSpans
+ .rebalance()
+ .setParallelism(littleParallelism)
+ .name("Rebalance")
+ .flatMap(new GenerateGroups<K>())
+ .setParallelism(littleParallelism)
+ .name("Generate groups");
+
+ // t, u, d(t)+d(u)
+ DataSet<Tuple3<K, K, IntValue>> twoPaths = groups
+ .groupBy(0, 1)
+ .sortGroup(2, Order.ASCENDING)
+ .reduceGroup(new GenerateGroupPairs<K>(groupSize))
+ .name("Generate group pairs");
+
+ // t, u, intersection, union
+ return twoPaths
+ .groupBy(0, 1)
+ .reduceGroup(new ComputeScores<K>(unboundedScores,
+ minimumScoreNumerator,
minimumScoreDenominator,
+ maximumScoreNumerator,
maximumScoreDenominator))
+ .name("Compute scores");
+ }
+
+ /**
+ * This is the first of three operations implementing a self-join to
generate
+ * the full neighbor pairing for each vertex. The number of neighbor
pairs
+ * is (n choose 2) which is quadratic in the vertex degree.
+ * <br/>
+ * The third operation, {@link GenerateGroupPairs}, processes groups of
size
+ * {@link #groupSize} and emits {@code O(groupSize * deg(vertex))}
pairs.
+ * <br/>
+ * This input to the third operation is still quadratic in the vertex
degree.
+ * Two prior operations, {@link GenerateGroupSpans} and {@link
GenerateGroups},
+ * each emit datasets linear in the vertex degree, with a forced
rebalance
+ * in between. {@link GenerateGroupSpans} first annotates each edge
with the
+ * number of groups and {@link GenerateGroups} emits each edge into
each group.
--- End diff --
ð for the detailed comment!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---