Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1980#discussion_r63894471
  
    --- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.library.similarity;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.GroupReduceFunction;
    +import org.apache.flink.api.common.operators.Order;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
    +import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
    +import org.apache.flink.graph.utils.Murmur3_32;
    +import org.apache.flink.types.CopyableValue;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * The Jaccard Index measures the similarity between vertex neighborhoods.
    + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
    + * <br/>
    + * This implementation produces similarity scores for each pair of vertices
    + * in the graph with at least one common neighbor; equivalently, this is 
the
    + * set of all non-zero Jaccard Similarity coefficients.
    + * <br/>
    + * The input graph must be a simple, undirected graph containing no 
duplicate
    + * edges or self-loops.
    + *
    + * @param <K> graph ID type
    + * @param <VV> vertex value type
    + * @param <EV> edge value type
    + */
    +public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
    +implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
    +
    +   public static final int DEFAULT_GROUP_SIZE = 64;
    +
    +   // Optional configuration
    +   private int groupSize = DEFAULT_GROUP_SIZE;
    +
    +   private boolean unboundedScores = true;
    +
    +   private int minimumScoreNumerator = 0;
    +
    +   private int minimumScoreDenominator = 1;
    +
    +   private int maximumScoreNumerator = 1;
    +
    +   private int maximumScoreDenominator = 0;
    +
    +   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +   /**
    +    * Override the default group size for the quadratic expansion of 
neighbor
    +    * pairs. Small groups generate more data whereas large groups 
distribute
    +    * computation less evenly among tasks.
    +    *
    +    * @param groupSize the group size for the quadratic expansion of 
neighbor pairs
    +    * @return this
    +    */
    +   public JaccardIndex<K, VV, EV> setGroupSize(int groupSize) {
    +           Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
    +
    +           this.groupSize = groupSize;
    +
    +           return this;
    +   }
    +
    +   /**
    +    * Filter out Jaccard Index scores less than the given minimum fraction.
    +    *
    +    * @param numerator numerator of the minimum score
    +    * @param denominator denominator of the minimum score
    +    * @return this
    +    * @see #setMaximumScore(int, int)
    +    */
    +   public JaccardIndex<K, VV, EV> setMinimumScore(int numerator, int 
denominator) {
    +           Preconditions.checkArgument(numerator >= 0, "Minimum score 
numerator must be non-negative");
    +           Preconditions.checkArgument(denominator > 0, "Minimum score 
denominator must be greater than zero");
    +           Preconditions.checkArgument(numerator <= denominator, "Minimum 
score fraction must be less than or equal to one");
    +
    +           this.unboundedScores = false;
    +           this.minimumScoreNumerator = numerator;
    +           this.minimumScoreDenominator = denominator;
    +
    +           return this;
    +   }
    +
    +   /**
    +    * Filter out Jaccard Index scores greater than or equal to the given 
maximum fraction.
    +    *
    +    * @param numerator numerator of the maximum score
    +    * @param denominator denominator of the maximum score
    +    * @return this
    +    * @see #setMinimumScore(int, int)
    +    */
    +   public JaccardIndex<K, VV, EV> setMaximumScore(int numerator, int 
denominator) {
    +           Preconditions.checkArgument(numerator >= 0, "Maximum score 
numerator must be non-negative");
    +           Preconditions.checkArgument(denominator > 0, "Maximum score 
denominator must be greater than zero");
    +           Preconditions.checkArgument(numerator <= denominator, "Maximum 
score fraction must be less than or equal to one");
    +
    +           this.unboundedScores = false;
    +           this.maximumScoreNumerator = numerator;
    +           this.maximumScoreDenominator = denominator;
    +
    +           return this;
    +   }
    +
    +   /**
    +    * Override the parallelism of operators processing small amounts of 
data.
    +    *
    +    * @param littleParallelism operator parallelism
    +    * @return this
    +    */
    +   public JaccardIndex<K, VV, EV> setLittleParallelism(int 
littleParallelism) {
    +           this.littleParallelism = littleParallelism;
    +
    +           return this;
    +   }
    +
    +   /*
    +    * Implementation notes:
    +    *
    +    * The requirement that "K extends CopyableValue<K>" can be removed when
    +    *   Flink has a self-join which performs the skew distribution handled 
by
    +    *   GenerateGroupSpans / GenerateGroups / GenerateGroupPairs.
    +    */
    +
    +   @Override
    +   public DataSet<Result<K>> run(Graph<K, VV, EV> input)
    --- End diff --
    
    Can you please add Javadoc for the `Result` type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to