[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation
[ https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633150#comment-14633150 ] ASF GitHub Bot commented on FLINK-2362: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/922#discussion_r34974826 --- Diff: docs/apis/dataset_transformations.md --- @@ -924,6 +924,183 @@ Not supported. **Note:** Extending the set of supported aggregation functions is on our roadmap. +### Distinct + +The Distinct transformation computes the DataSet of the distinct elements of the source DataSet. +The following code removes from the DataSet the duplicate elements: --- End diff -- I'd rephrase this sentence to The following code removes duplicate elements from the DataSet: distinct is missing in DataSet API documentation Key: FLINK-2362 URL: https://issues.apache.org/jira/browse/FLINK-2362 Project: Flink Issue Type: Bug Components: Documentation, Java API, Scala API Affects Versions: 0.9, 0.10 Reporter: Fabian Hueske Assignee: pietro pinoli Fix For: 0.10, 0.9.1 The DataSet transformation {{distinct}} is not described or listed in the documentation. It is not contained in the DataSet API programming guide (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html) and not in the DataSet Transformation (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/922#issuecomment-122805962 Thanks for improving the documentation. Looks good to merge for me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation
[ https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633152#comment-14633152 ] ASF GitHub Bot commented on FLINK-2362: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/922#issuecomment-122805962 Thanks for improving the documentation. Looks good to merge for me. distinct is missing in DataSet API documentation Key: FLINK-2362 URL: https://issues.apache.org/jira/browse/FLINK-2362 Project: Flink Issue Type: Bug Components: Documentation, Java API, Scala API Affects Versions: 0.9, 0.10 Reporter: Fabian Hueske Assignee: pietro pinoli Fix For: 0.10, 0.9.1 The DataSet transformation {{distinct}} is not described or listed in the documentation. It is not contained in the DataSet API programming guide (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html) and not in the DataSet Transformation (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/922#discussion_r34974826 --- Diff: docs/apis/dataset_transformations.md --- @@ -924,6 +924,183 @@ Not supported. **Note:** Extending the set of supported aggregation functions is on our roadmap. +### Distinct + +The Distinct transformation computes the DataSet of the distinct elements of the source DataSet. +The following code removes from the DataSet the duplicate elements: --- End diff -- I'd rephrase this sentence to The following code removes duplicate elements from the DataSet: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation
[ https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633130#comment-14633130 ] ASF GitHub Bot commented on FLINK-1723: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/891#discussion_r34973140 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.RichDataSet +import java.util.Random + +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, FitOperation, Predictor} + +object CrossValidation { + def crossValScore[P : Predictor[P], T]( + predictor: P, + data: DataSet[T], + scorerOption: Option[Scorer] = None, + cv: FoldGenerator = KFold(), + seed: Long = new Random().nextLong())(implicit fitOperation: FitOperation[P, T], + evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): Array[DataSet[Double]] = { +val folds = cv.folds(data, 1) + +val scores = folds.map { + case (training: DataSet[T], testing: DataSet[T]) = +predictor.fit(training) +if (scorerOption.isEmpty) { + predictor.score(testing) +} else { + val s = scorerOption.get + s.evaluate(testing, predictor) +} +} +// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] i.e. reduce-union? +// Or: Return mean and std? +scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = left.union(right)).mean() + } +} + +abstract class FoldGenerator { + + /** Takes a DataSet as input and creates splits (folds) of the data into +* (training, testing) pairs. +* +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] +} + +class KFold(numFolds: Int) extends FoldGenerator{ + + /** Takes a DataSet as input and creates K splits (folds) of the data into non-overlapping +* (training, testing) pairs. +* +* Code based on Apache Spark implementation +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + override def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] = { +val numFoldsF = numFolds.toFloat +(1 to numFolds).map { fold = + val lb = (fold - 1) / numFoldsF + val ub = fold / numFoldsF + val validation = input.sampleBounded(lb, ub, complement = false, seed = seed) + val training = input.sampleBounded(lb, ub, complement = true, seed = seed) + (training, validation) --- End diff -- Given that `input` is deterministic, this code will produce mutually exclusive validation and training data sets. Add cross validation for model evaluation - Key: FLINK-1723 URL: https://issues.apache.org/jira/browse/FLINK-1723 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore
[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation
[ https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633136#comment-14633136 ] ASF GitHub Bot commented on FLINK-1723: --- Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/891#discussion_r34973783 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.RichDataSet +import java.util.Random + +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, FitOperation, Predictor} + +object CrossValidation { + def crossValScore[P : Predictor[P], T]( + predictor: P, + data: DataSet[T], + scorerOption: Option[Scorer] = None, + cv: FoldGenerator = KFold(), + seed: Long = new Random().nextLong())(implicit fitOperation: FitOperation[P, T], + evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): Array[DataSet[Double]] = { +val folds = cv.folds(data, 1) + +val scores = folds.map { + case (training: DataSet[T], testing: DataSet[T]) = +predictor.fit(training) +if (scorerOption.isEmpty) { + predictor.score(testing) +} else { + val s = scorerOption.get + s.evaluate(testing, predictor) +} +} +// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] i.e. reduce-union? +// Or: Return mean and std? +scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = left.union(right)).mean() + } +} + +abstract class FoldGenerator { + + /** Takes a DataSet as input and creates splits (folds) of the data into +* (training, testing) pairs. +* +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] +} + +class KFold(numFolds: Int) extends FoldGenerator{ + + /** Takes a DataSet as input and creates K splits (folds) of the data into non-overlapping +* (training, testing) pairs. +* +* Code based on Apache Spark implementation +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + override def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] = { +val numFoldsF = numFolds.toFloat +(1 to numFolds).map { fold = + val lb = (fold - 1) / numFoldsF + val ub = fold / numFoldsF + val validation = input.sampleBounded(lb, ub, complement = false, seed = seed) + val training = input.sampleBounded(lb, ub, complement = true, seed = seed) + (training, validation) --- End diff -- Ah yes. I see that now. Add cross validation for model evaluation - Key: FLINK-1723 URL: https://issues.apache.org/jira/browse/FLINK-1723 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Labels: ML Cross validation [1] is a standard tool to estimate
[jira] [Updated] (FLINK-2379) Add methods to evaluate field wise statistics over DataSet of vectors.
[ https://issues.apache.org/jira/browse/FLINK-2379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sachin Goel updated FLINK-2379: --- Description: Design methods to evaluate statistics over dataset of vectors. For continuous fields, Minimum, maximum, mean, variance. For discrete fields, Class counts, Entropy, Gini Impurity. Further statistical measures can also be supported. For example, correlation between two series, computing the covariance matrix, etc. [These are currently the things Spark supports.] was: Design methods to evaluate statistics over dataset of vectors. For continuous fields, Minimum, maximum, mean, variance. For discrete fields, Class counts, Entropy, Gini Impurity. Issue Type: New Feature (was: Bug) Add methods to evaluate field wise statistics over DataSet of vectors. -- Key: FLINK-2379 URL: https://issues.apache.org/jira/browse/FLINK-2379 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Sachin Goel Design methods to evaluate statistics over dataset of vectors. For continuous fields, Minimum, maximum, mean, variance. For discrete fields, Class counts, Entropy, Gini Impurity. Further statistical measures can also be supported. For example, correlation between two series, computing the covariance matrix, etc. [These are currently the things Spark supports.] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2075] Add Approximate Adamic Adar Simil...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/923#issuecomment-122784993 I think you've referenced the wrong jira ticket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2075) Shade akka and protobuf dependencies away
[ https://issues.apache.org/jira/browse/FLINK-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633114#comment-14633114 ] ASF GitHub Bot commented on FLINK-2075: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/923#issuecomment-122784993 I think you've referenced the wrong jira ticket. Shade akka and protobuf dependencies away - Key: FLINK-2075 URL: https://issues.apache.org/jira/browse/FLINK-2075 Project: Flink Issue Type: Improvement Components: Build System Reporter: Till Rohrmann Fix For: 0.9 Lately, the Zeppelin project encountered the following problem: It includes flink-runtime which depends on akka_remote:2.3.7 which again depends on protobuf-java:2.5.0. However, Zeppelin set the protobuf-java version to 2.4.1 to make it build with YARN 2.2. Due to this, akka_remote finds a wrong protobuf-java version and fails because of an incompatible change between these versions. I propose to shade Flink's akka dependency and protobuf dependency away, so that user projects depending on Flink are not forced to use a special akka/protobuf version. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2312) Random Splits
[ https://issues.apache.org/jira/browse/FLINK-2312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633118#comment-14633118 ] Maximilian Alber commented on FLINK-2312: - I agree too. Something else: How do you ensure the ratios? As I see they are only approximately ensured when you have a big number of samples. On Mon, Jul 20, 2015 at 9:26 AM, ASF GitHub Bot (JIRA) j...@apache.org Random Splits - Key: FLINK-2312 URL: https://issues.apache.org/jira/browse/FLINK-2312 Project: Flink Issue Type: Wish Components: Machine Learning Library Reporter: Maximilian Alber Assignee: pietro pinoli Priority: Minor In machine learning applications it is common to split data sets into f.e. training and testing set. To the best of my knowledge there is at the moment no nice way in Flink to split a data set randomly into several partitions according to some ratio. The wished semantic would be the same as of Sparks RDD randomSplit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/891#discussion_r34973783 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.RichDataSet +import java.util.Random + +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, FitOperation, Predictor} + +object CrossValidation { + def crossValScore[P : Predictor[P], T]( + predictor: P, + data: DataSet[T], + scorerOption: Option[Scorer] = None, + cv: FoldGenerator = KFold(), + seed: Long = new Random().nextLong())(implicit fitOperation: FitOperation[P, T], + evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): Array[DataSet[Double]] = { +val folds = cv.folds(data, 1) + +val scores = folds.map { + case (training: DataSet[T], testing: DataSet[T]) = +predictor.fit(training) +if (scorerOption.isEmpty) { + predictor.score(testing) +} else { + val s = scorerOption.get + s.evaluate(testing, predictor) +} +} +// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] i.e. reduce-union? +// Or: Return mean and std? +scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = left.union(right)).mean() + } +} + +abstract class FoldGenerator { + + /** Takes a DataSet as input and creates splits (folds) of the data into +* (training, testing) pairs. +* +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] +} + +class KFold(numFolds: Int) extends FoldGenerator{ + + /** Takes a DataSet as input and creates K splits (folds) of the data into non-overlapping +* (training, testing) pairs. +* +* Code based on Apache Spark implementation +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + override def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] = { +val numFoldsF = numFolds.toFloat +(1 to numFolds).map { fold = + val lb = (fold - 1) / numFoldsF + val ub = fold / numFoldsF + val validation = input.sampleBounded(lb, ub, complement = false, seed = seed) + val training = input.sampleBounded(lb, ub, complement = true, seed = seed) + (training, validation) --- End diff -- Ah yes. I see that now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2312][ml][WIP] Randomly Splitting a Dat...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/921#issuecomment-122785194 I agree, a generalized implementation would be favorable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2312) Random Splits
[ https://issues.apache.org/jira/browse/FLINK-2312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633116#comment-14633116 ] ASF GitHub Bot commented on FLINK-2312: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/921#issuecomment-122785194 I agree, a generalized implementation would be favorable. Random Splits - Key: FLINK-2312 URL: https://issues.apache.org/jira/browse/FLINK-2312 Project: Flink Issue Type: Wish Components: Machine Learning Library Reporter: Maximilian Alber Assignee: pietro pinoli Priority: Minor In machine learning applications it is common to split data sets into f.e. training and testing set. To the best of my knowledge there is at the moment no nice way in Flink to split a data set randomly into several partitions according to some ratio. The wished semantic would be the same as of Sparks RDD randomSplit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/891#discussion_r34973140 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.RichDataSet +import java.util.Random + +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, FitOperation, Predictor} + +object CrossValidation { + def crossValScore[P : Predictor[P], T]( + predictor: P, + data: DataSet[T], + scorerOption: Option[Scorer] = None, + cv: FoldGenerator = KFold(), + seed: Long = new Random().nextLong())(implicit fitOperation: FitOperation[P, T], + evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): Array[DataSet[Double]] = { +val folds = cv.folds(data, 1) + +val scores = folds.map { + case (training: DataSet[T], testing: DataSet[T]) = +predictor.fit(training) +if (scorerOption.isEmpty) { + predictor.score(testing) +} else { + val s = scorerOption.get + s.evaluate(testing, predictor) +} +} +// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] i.e. reduce-union? +// Or: Return mean and std? +scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = left.union(right)).mean() + } +} + +abstract class FoldGenerator { + + /** Takes a DataSet as input and creates splits (folds) of the data into +* (training, testing) pairs. +* +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] +} + +class KFold(numFolds: Int) extends FoldGenerator{ + + /** Takes a DataSet as input and creates K splits (folds) of the data into non-overlapping +* (training, testing) pairs. +* +* Code based on Apache Spark implementation +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + override def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] = { +val numFoldsF = numFolds.toFloat +(1 to numFolds).map { fold = + val lb = (fold - 1) / numFoldsF + val ub = fold / numFoldsF + val validation = input.sampleBounded(lb, ub, complement = false, seed = seed) + val training = input.sampleBounded(lb, ub, complement = true, seed = seed) + (training, validation) --- End diff -- Given that `input` is deterministic, this code will produce mutually exclusive validation and training data sets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633162#comment-14633162 ] Stephan Ewen commented on FLINK-2377: - Can you verify that this is a problem of the test, i.e., the test forgets to close the files? AbstractTestBase.deleteAllTempFiles sometimes fails on Windows -- Key: FLINK-2377 URL: https://issues.apache.org/jira/browse/FLINK-2377 Project: Flink Issue Type: Bug Components: Tests Environment: Windows Reporter: Gabor Gevay Priority: Minor This is probably another file closing issue. (that is, Windows won't delete open files, as opposed to Linux) I have encountered two concrete tests so far where this actually appears: CsvOutputFormatITCase and CollectionSourceTest. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2027) Flink website does not provide link to source repo
[ https://issues.apache.org/jira/browse/FLINK-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-2027. Resolution: Fixed I think this was reopened by accident. Flink website does not provide link to source repo -- Key: FLINK-2027 URL: https://issues.apache.org/jira/browse/FLINK-2027 Project: Flink Issue Type: Bug Reporter: Sebb Priority: Critical As the subject says - I could not find a link to the source repo anywhere obvious on the website -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation
[ https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633184#comment-14633184 ] ASF GitHub Bot commented on FLINK-1723: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/891#discussion_r34976620 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.RichDataSet +import java.util.Random + +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, FitOperation, Predictor} + +object CrossValidation { + def crossValScore[P : Predictor[P], T]( + predictor: P, + data: DataSet[T], + scorerOption: Option[Scorer] = None, + cv: FoldGenerator = KFold(), + seed: Long = new Random().nextLong())(implicit fitOperation: FitOperation[P, T], + evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): Array[DataSet[Double]] = { +val folds = cv.folds(data, 1) + +val scores = folds.map { + case (training: DataSet[T], testing: DataSet[T]) = +predictor.fit(training) +if (scorerOption.isEmpty) { + predictor.score(testing) +} else { + val s = scorerOption.get + s.evaluate(testing, predictor) +} +} +// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] i.e. reduce-union? +// Or: Return mean and std? +scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = left.union(right)).mean() + } +} + +abstract class FoldGenerator { + + /** Takes a DataSet as input and creates splits (folds) of the data into +* (training, testing) pairs. +* +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] +} + +class KFold(numFolds: Int) extends FoldGenerator{ + + /** Takes a DataSet as input and creates K splits (folds) of the data into non-overlapping +* (training, testing) pairs. +* +* Code based on Apache Spark implementation +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + override def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] = { +val numFoldsF = numFolds.toFloat +(1 to numFolds).map { fold = + val lb = (fold - 1) / numFoldsF + val ub = fold / numFoldsF + val validation = input.sampleBounded(lb, ub, complement = false, seed = seed) + val training = input.sampleBounded(lb, ub, complement = true, seed = seed) + (training, validation) --- End diff -- The test shouldn't fail. Maybe there is an error then. What one could do to have different sequences on each node is to xor the subtask id with the seed. But IMO this does not change the statistical properties of the sample because we don't know the underlying order of the elements. E.g. the underlying order of the element could be that way that we obtain the same sample set as with an identical seed and a different order. Add cross validation for model evaluation
[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/891#discussion_r34976620 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.RichDataSet +import java.util.Random + +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, FitOperation, Predictor} + +object CrossValidation { + def crossValScore[P : Predictor[P], T]( + predictor: P, + data: DataSet[T], + scorerOption: Option[Scorer] = None, + cv: FoldGenerator = KFold(), + seed: Long = new Random().nextLong())(implicit fitOperation: FitOperation[P, T], + evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): Array[DataSet[Double]] = { +val folds = cv.folds(data, 1) + +val scores = folds.map { + case (training: DataSet[T], testing: DataSet[T]) = +predictor.fit(training) +if (scorerOption.isEmpty) { + predictor.score(testing) +} else { + val s = scorerOption.get + s.evaluate(testing, predictor) +} +} +// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] i.e. reduce-union? +// Or: Return mean and std? +scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = left.union(right)).mean() + } +} + +abstract class FoldGenerator { + + /** Takes a DataSet as input and creates splits (folds) of the data into +* (training, testing) pairs. +* +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] +} + +class KFold(numFolds: Int) extends FoldGenerator{ + + /** Takes a DataSet as input and creates K splits (folds) of the data into non-overlapping +* (training, testing) pairs. +* +* Code based on Apache Spark implementation +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + override def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] = { +val numFoldsF = numFolds.toFloat +(1 to numFolds).map { fold = + val lb = (fold - 1) / numFoldsF + val ub = fold / numFoldsF + val validation = input.sampleBounded(lb, ub, complement = false, seed = seed) + val training = input.sampleBounded(lb, ub, complement = true, seed = seed) + (training, validation) --- End diff -- The test shouldn't fail. Maybe there is an error then. What one could do to have different sequences on each node is to xor the subtask id with the seed. But IMO this does not change the statistical properties of the sample because we don't know the underlying order of the elements. E.g. the underlying order of the element could be that way that we obtain the same sample set as with an identical seed and a different order. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA
[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/891#discussion_r34975724 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.RichDataSet +import java.util.Random + +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, FitOperation, Predictor} + +object CrossValidation { + def crossValScore[P : Predictor[P], T]( + predictor: P, + data: DataSet[T], + scorerOption: Option[Scorer] = None, + cv: FoldGenerator = KFold(), + seed: Long = new Random().nextLong())(implicit fitOperation: FitOperation[P, T], + evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): Array[DataSet[Double]] = { +val folds = cv.folds(data, 1) + +val scores = folds.map { + case (training: DataSet[T], testing: DataSet[T]) = +predictor.fit(training) +if (scorerOption.isEmpty) { + predictor.score(testing) +} else { + val s = scorerOption.get + s.evaluate(testing, predictor) +} +} +// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] i.e. reduce-union? +// Or: Return mean and std? +scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = left.union(right)).mean() + } +} + +abstract class FoldGenerator { + + /** Takes a DataSet as input and creates splits (folds) of the data into +* (training, testing) pairs. +* +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] +} + +class KFold(numFolds: Int) extends FoldGenerator{ + + /** Takes a DataSet as input and creates K splits (folds) of the data into non-overlapping +* (training, testing) pairs. +* +* Code based on Apache Spark implementation +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + override def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] = { +val numFoldsF = numFolds.toFloat +(1 to numFolds).map { fold = + val lb = (fold - 1) / numFoldsF + val ub = fold / numFoldsF + val validation = input.sampleBounded(lb, ub, complement = false, seed = seed) + val training = input.sampleBounded(lb, ub, complement = true, seed = seed) + (training, validation) --- End diff -- However, in case the parallelism of data is more than one, this can lead to problem. The random number sequence generated on every node would be the same, wouldn't it? I printed all the random numbers generated and it looks like this: https://gist.github.com/sachingoel0101/ecde269af996fba7a39a Further, for a parallelism of 2, the test itself fails. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...
Github user pp86 commented on the pull request: https://github.com/apache/flink/pull/922#issuecomment-122815918 Thanks for the suggestion @mxm , I updated the documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2367) “flink-xx-jobmanager-linux-3lsu.log file can't auto be recovered/detected after mistaking delete
[ https://issues.apache.org/jira/browse/FLINK-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633163#comment-14633163 ] Maximilian Michels commented on FLINK-2367: --- I'm afraid the logger is unaware of the deleted file. We would have to manually check if the file still exists but that seems to be a too much effort for such a corner case. However, we could add a note in the documentation that log file deletion during runtime is not supported. “flink-xx-jobmanager-linux-3lsu.log file can't auto be recovered/detected after mistaking delete - Key: FLINK-2367 URL: https://issues.apache.org/jira/browse/FLINK-2367 Project: Flink Issue Type: Improvement Components: JobManager Affects Versions: 0.9 Environment: Linux Reporter: chenliang Priority: Minor Labels: reliability Fix For: 0.9.0 For checking system whether be adequately reliability, testers usually designedly do some delete operation. Steps: 1.go to flink\build-target\log 2.delete “flink-xx-jobmanager-linux-3lsu.log file 3.Run jobs along with writing log info, meanwhile the system didn't give any error info when the log info can't be wrote correctly. 4.when some jobs be run failed , go to check log file for finding the reason, can't find the log file. Must restart Job Manager to regenerate the log file, then continue to run jobs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation
[ https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633165#comment-14633165 ] ASF GitHub Bot commented on FLINK-1723: --- Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/891#discussion_r34975724 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.RichDataSet +import java.util.Random + +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, FitOperation, Predictor} + +object CrossValidation { + def crossValScore[P : Predictor[P], T]( + predictor: P, + data: DataSet[T], + scorerOption: Option[Scorer] = None, + cv: FoldGenerator = KFold(), + seed: Long = new Random().nextLong())(implicit fitOperation: FitOperation[P, T], + evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): Array[DataSet[Double]] = { +val folds = cv.folds(data, 1) + +val scores = folds.map { + case (training: DataSet[T], testing: DataSet[T]) = +predictor.fit(training) +if (scorerOption.isEmpty) { + predictor.score(testing) +} else { + val s = scorerOption.get + s.evaluate(testing, predictor) +} +} +// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] i.e. reduce-union? +// Or: Return mean and std? +scores//.reduce((right: DataSet[Double], left: DataSet[Double]) = left.union(right)).mean() + } +} + +abstract class FoldGenerator { + + /** Takes a DataSet as input and creates splits (folds) of the data into +* (training, testing) pairs. +* +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] +} + +class KFold(numFolds: Int) extends FoldGenerator{ + + /** Takes a DataSet as input and creates K splits (folds) of the data into non-overlapping +* (training, testing) pairs. +* +* Code based on Apache Spark implementation +* @param input The DataSet that will be split into folds +* @param seed Seed for replicable splitting of the data +* @tparam T The type of the DataSet +* @return An Array containing K (training, testing) tuples, where training and testing are +* DataSets +*/ + override def folds[T]( + input: DataSet[T], + seed: Long = new Random().nextLong()): Array[(DataSet[T], DataSet[T])] = { +val numFoldsF = numFolds.toFloat +(1 to numFolds).map { fold = + val lb = (fold - 1) / numFoldsF + val ub = fold / numFoldsF + val validation = input.sampleBounded(lb, ub, complement = false, seed = seed) + val training = input.sampleBounded(lb, ub, complement = true, seed = seed) + (training, validation) --- End diff -- However, in case the parallelism of data is more than one, this can lead to problem. The random number sequence generated on every node would be the same, wouldn't it? I printed all the random numbers generated and it looks like this: https://gist.github.com/sachingoel0101/ecde269af996fba7a39a Further, for a parallelism of 2, the test itself fails. Add cross validation for model evaluation - Key:
[jira] [Commented] (FLINK-2299) The slot on which the task maanger was scheduled was killed
[ https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633166#comment-14633166 ] Ufuk Celebi commented on FLINK-2299: The FAQ is on the website repository flink-web (http://flink.apache.org/community.html#source-code). The slot on which the task maanger was scheduled was killed --- Key: FLINK-2299 URL: https://issues.apache.org/jira/browse/FLINK-2299 Project: Flink Issue Type: Bug Affects Versions: 0.9, 0.10 Reporter: Andra Lungu Priority: Critical Fix For: 0.9.1 The following code: https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java Ran on the twitter follower graph: http://twitter.mpi-sws.org/data-icwsm2010.html With a similar configuration to the one in FLINK-2293 fails with the following exception: java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ wally025 - 8 slots - URL: akka.tcp://flink@130.149.249.35:56135/user/taskmanager at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 06/29/2015 10:33:46 Job execution switched to status FAILING. The logs are here: https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation
[ https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633189#comment-14633189 ] ASF GitHub Bot commented on FLINK-2362: --- Github user pp86 commented on the pull request: https://github.com/apache/flink/pull/922#issuecomment-122815918 Thanks for the suggestion @mxm , I updated the documentation. distinct is missing in DataSet API documentation Key: FLINK-2362 URL: https://issues.apache.org/jira/browse/FLINK-2362 Project: Flink Issue Type: Bug Components: Documentation, Java API, Scala API Affects Versions: 0.9, 0.10 Reporter: Fabian Hueske Assignee: pietro pinoli Fix For: 0.10, 0.9.1 The DataSet transformation {{distinct}} is not described or listed in the documentation. It is not contained in the DataSet API programming guide (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html) and not in the DataSet Transformation (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2380) Allow to configure default FS for file inputs
Ufuk Celebi created FLINK-2380: -- Summary: Allow to configure default FS for file inputs Key: FLINK-2380 URL: https://issues.apache.org/jira/browse/FLINK-2380 Project: Flink Issue Type: Improvement Components: JobManager Affects Versions: 0.9, master Reporter: Ufuk Celebi Priority: Minor Fix For: 0.10 File inputs use file:// as default prefix. A user asked to make this configurable, e.g. hdfs:// as default. (I'm not sure whether this is already possible or not. I will check and either close or implement this for the user.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2299) The slot on which the task maanger was scheduled was killed
[ https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andra Lungu resolved FLINK-2299. Resolution: Fixed Fix Version/s: (was: 0.9.1) 0.10 The slot on which the task maanger was scheduled was killed --- Key: FLINK-2299 URL: https://issues.apache.org/jira/browse/FLINK-2299 Project: Flink Issue Type: Bug Affects Versions: 0.9, 0.10 Reporter: Andra Lungu Assignee: Andra Lungu Priority: Critical Fix For: 0.10 The following code: https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java Ran on the twitter follower graph: http://twitter.mpi-sws.org/data-icwsm2010.html With a similar configuration to the one in FLINK-2293 fails with the following exception: java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ wally025 - 8 slots - URL: akka.tcp://flink@130.149.249.35:56135/user/taskmanager at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 06/29/2015 10:33:46 Job execution switched to status FAILING. The logs are here: https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation
[ https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633428#comment-14633428 ] ASF GitHub Bot commented on FLINK-2362: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/922#discussion_r34984880 --- Diff: docs/apis/programming_guide.md --- @@ -606,7 +606,17 @@ DataSetTuple3Integer, String, Double output = input.sum(0).andMin(2); /td /tr +tr + tdstrongDistinct/strong/td + td +pReturns the distinct elements of a data set./p --- End diff -- Could you add a sentence here to explain what distinct means? distinct is missing in DataSet API documentation Key: FLINK-2362 URL: https://issues.apache.org/jira/browse/FLINK-2362 Project: Flink Issue Type: Bug Components: Documentation, Java API, Scala API Affects Versions: 0.9, 0.10 Reporter: Fabian Hueske Assignee: pietro pinoli Fix For: 0.10, 0.9.1 The DataSet transformation {{distinct}} is not described or listed in the documentation. It is not contained in the DataSet API programming guide (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html) and not in the DataSet Transformation (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/922#discussion_r34984880 --- Diff: docs/apis/programming_guide.md --- @@ -606,7 +606,17 @@ DataSetTuple3Integer, String, Double output = input.sum(0).andMin(2); /td /tr +tr + tdstrongDistinct/strong/td + td +pReturns the distinct elements of a data set./p --- End diff -- Could you add a sentence here to explain what distinct means? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2299) The slot on which the task maanger was scheduled was killed
[ https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633422#comment-14633422 ] Andra Lungu commented on FLINK-2299: Thanks, Ufuk! :) The slot on which the task maanger was scheduled was killed --- Key: FLINK-2299 URL: https://issues.apache.org/jira/browse/FLINK-2299 Project: Flink Issue Type: Bug Affects Versions: 0.9, 0.10 Reporter: Andra Lungu Assignee: Andra Lungu Priority: Critical Fix For: 0.9.1 The following code: https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java Ran on the twitter follower graph: http://twitter.mpi-sws.org/data-icwsm2010.html With a similar configuration to the one in FLINK-2293 fails with the following exception: java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ wally025 - 8 slots - URL: akka.tcp://flink@130.149.249.35:56135/user/taskmanager at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 06/29/2015 10:33:46 Job execution switched to status FAILING. The logs are here: https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2375) Add Approximate Adamic Adar Similarity method using BloomFilters
[ https://issues.apache.org/jira/browse/FLINK-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633514#comment-14633514 ] ASF GitHub Bot commented on FLINK-2375: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/923#issuecomment-122868758 Could you also do a git ammend to reference the correct jira issue in the commit? Add Approximate Adamic Adar Similarity method using BloomFilters Key: FLINK-2375 URL: https://issues.apache.org/jira/browse/FLINK-2375 Project: Flink Issue Type: Task Components: Gelly Reporter: Shivani Ghatge Assignee: Shivani Ghatge Priority: Minor Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a set of nodes. However, instead of counting the common neighbors and dividing them by the total number of neighbors, the similarity is weighted according to the vertex degrees. In particular, it's equal to log(1/numberOfEdges). The Adamic-Adar algorithm can be broken into three steps: 1). For each vertex, compute the log of its inverse degrees (with the formula above) and set it as the vertex value. 2). Each vertex will then send this new computed value along with a list of neighbors to the targets of its out-edges 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is the degree of node n). See [2] Using BloomFilters we increase the scalability of the algorithm. The values calculated for the edges will be approximate. Prerequisites: Full understanding of the Jaccard Similarity Measure algorithm Reading the associated literature: [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf [2] http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/923#issuecomment-122868758 Could you also do a git ammend to reference the correct jira issue in the commit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633525#comment-14633525 ] Gabor Gevay commented on FLINK-2377: Yes, I have found the problem: TestBaseUtils.readAllResultLines is not closing the readers. I will open a PR that fixes this. AbstractTestBase.deleteAllTempFiles sometimes fails on Windows -- Key: FLINK-2377 URL: https://issues.apache.org/jira/browse/FLINK-2377 Project: Flink Issue Type: Bug Components: Tests Environment: Windows Reporter: Gabor Gevay Priority: Minor This is probably another file closing issue. (that is, Windows won't delete open files, as opposed to Linux) I have encountered two concrete tests so far where this actually appears: CsvOutputFormatITCase and CollectionSourceTest. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2377] Add reader.close() to readAllResu...
GitHub user ggevay opened a pull request: https://github.com/apache/flink/pull/924 [FLINK-2377] Add reader.close() to readAllResultLines You can merge this pull request into a Git repository by running: $ git pull https://github.com/ggevay/flink readAllResultLinesFix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #924 commit 63b71d7c2364652030fdca360b9804c344da56a3 Author: Gabor Gevay gga...@gmail.com Date: 2015-07-20T12:39:43Z [FLINK-2377] Add reader.close() to readAllResultLines --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-2381) Possible class not found Exception on failed partition producer
[ https://issues.apache.org/jira/browse/FLINK-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi reassigned FLINK-2381: -- Assignee: Ufuk Celebi Possible class not found Exception on failed partition producer --- Key: FLINK-2381 URL: https://issues.apache.org/jira/browse/FLINK-2381 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 0.9, master Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10, 0.9.1 Failing the production of a result partition marks the respective partition as failed with a ProducerFailedException. The cause of this exception can be a user defined class, which can only be loaded by the user code class loader. The network stack fails the shuffle with a RemoteTransportException, which has the user exception as a cause. When the consuming task receives this exception, this leads to a class not found exception, because the network stack tries to load the class with the system class loader. {code} +--+ | FAILING | | PRODUCER | +--+ || \/ ProducerFailedException(CAUSE) via network || \/ +--+ | RECEIVER | +--+ {code} CAUSE is only loadable by the user code class loader. When trying to deserialize this, RECEIVER fails with a LocalTransportException, which is super confusing, because the error is not local, but remote. Thanks to [~rmetzger] for reporting and debugging the issue with the following stack trace: {code} Flat Map (26/120) 14:03:00,343 ERROR org.apache.flink.streaming.runtime.tasks.OneInputStreamTask - Flat Map (26/120) failed java.lang.RuntimeException: Could not read next record. at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more Caused by: io.netty.handler.codec.DecoderException: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ... 12 more Caused by: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at
[jira] [Created] (FLINK-2381) Possible class not found Exception on failed partition producer
Ufuk Celebi created FLINK-2381: -- Summary: Possible class not found Exception on failed partition producer Key: FLINK-2381 URL: https://issues.apache.org/jira/browse/FLINK-2381 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 0.9, master Reporter: Ufuk Celebi Fix For: 0.10, 0.9.1 Failing the production of a result partition marks the respective partition as failed with a ProducerFailedException. The cause of this exception can be a user defined class, which can only be loaded by the user code class loader. The network stack fails the shuffle with a RemoteTransportException, which has the user exception as a cause. When the consuming task receives this exception, this leads to a class not found exception, because the network stack tries to load the class with the system class loader. {code} +--+ | FAILING | | PRODUCER | +--+ || \/ ProducerFailedException(CAUSE) via network || \/ +--+ | RECEIVER | +--+ {code} CAUSE is only loadable by the user code class loader. When trying to deserialize this, RECEIVER fails with a LocalTransportException, which is super confusing, because the error is not local, but remote. Thanks to [~rmetzger] for reporting and debugging the issue with the following stack trace: {code} Flat Map (26/120) 14:03:00,343 ERROR org.apache.flink.streaming.runtime.tasks.OneInputStreamTask - Flat Map (26/120) failed java.lang.RuntimeException: Could not read next record. at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more Caused by: io.netty.handler.codec.DecoderException: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ... 12 more Caused by: java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at
[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
[ https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633628#comment-14633628 ] ASF GitHub Bot commented on FLINK-2377: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/924#issuecomment-122893574 Great catch! Looks good to merge. AbstractTestBase.deleteAllTempFiles sometimes fails on Windows -- Key: FLINK-2377 URL: https://issues.apache.org/jira/browse/FLINK-2377 Project: Flink Issue Type: Bug Components: Tests Environment: Windows Reporter: Gabor Gevay Priority: Minor This is probably another file closing issue. (that is, Windows won't delete open files, as opposed to Linux) I have encountered two concrete tests so far where this actually appears: CsvOutputFormatITCase and CollectionSourceTest. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2377] Add reader.close() to readAllResu...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/924#issuecomment-122893574 Great catch! Looks good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2382) Live Metric Reporting Does Not Work for Two-Input StreamTasks
Aljoscha Krettek created FLINK-2382: --- Summary: Live Metric Reporting Does Not Work for Two-Input StreamTasks Key: FLINK-2382 URL: https://issues.apache.org/jira/browse/FLINK-2382 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Aljoscha Krettek Also, there are no tests for the live metrics in streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1967] Introduce (Event)time in Streamin...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/906#issuecomment-122949512 Does anyone have any objections still? The impact of the timestamps is now completely disabled by default. If there are no objections I would like to merge this by tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1967) Introduce (Event)time in Streaming
[ https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633826#comment-14633826 ] ASF GitHub Bot commented on FLINK-1967: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/906#issuecomment-122949512 Does anyone have any objections still? The impact of the timestamps is now completely disabled by default. If there are no objections I would like to merge this by tomorrow. Introduce (Event)time in Streaming -- Key: FLINK-1967 URL: https://issues.apache.org/jira/browse/FLINK-1967 Project: Flink Issue Type: Improvement Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek This requires introducing a timestamp in streaming record and a change in the sources to add timestamps to records. This will also introduce punctuations (or low watermarks) to allow windows to work correctly on unordered, timestamped input data. In the process of this, the windowing subsystem also needs to be adapted to use the punctuations. Furthermore, all operators need to be made aware of punctuations and correctly forward them. Then, a new operator must be introduced to to allow modification of timestamps. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2371) AccumulatorLiveITCase fails
[ https://issues.apache.org/jira/browse/FLINK-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633752#comment-14633752 ] ASF GitHub Bot commented on FLINK-2371: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/925 [FLINK-2371] improve AccumulatorLiveITCase Instead of using Thread.sleep() to synchronize the checks of the accumulator values, we rely on message passing here to synchronize the task process. Therefore, we let the task process signal to the task manager that it has updated its accumulator values. The task manager lets the job manager know and sends out the heartbeat which contains the accumulators. When the job manager receives the accumulators and has been notified previously, it sends a message to the subscribed test case with the current accumulators. This assures that all processes are always synchronized correctly and we can verify the live accumulator results correctly. In the course of rewriting the test, I had to change two things in the implementation: a) User accumulators are now immediately serialized as well. Otherwise, Akka does not serialize in local one VM setups and passes the live accumulator map through. b) The asynchronous update of the accumulators can be disabled for tests. This was necessary because we cannot guarantee when the Future for updating the accumulators is executed. In real setups this is neglectable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink live-accumulators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #925 commit 44687e783065a4157d2d3a695d9e94070ca6e8cd Author: Maximilian Michels m...@apache.org Date: 2015-07-20T09:55:11Z [FLINK-2371] improve AccumulatorLiveITCase Instead of using Thread.sleep() to synchronize the checks of the accumulator values, we rely on message passing here to synchronize the task process. Therefore, we let the task process signal to the task manager that it has updated its accumulator values. The task manager lets the job manager know and sends out the heartbeat which contains the accumulators. When the job manager receives the accumulators and has been notified previously, it sends a message to the subscribed test case with the current accumulators. This assures that all processes are always synchronized correctly and we can verify the live accumulator results correctly. In the course of rewriting the test, I had to change two things in the implementation: a) User accumulators are now immediately serialized as well. Otherwise, Akka does not serialize in local one VM setups and passes the live accumulator map through. b) The asynchronous update of the accumulators can be disabled for tests. This was necessary because we cannot guarantee when the Future for updating the accumulators is executed. In real setups this is neglectable. AccumulatorLiveITCase fails --- Key: FLINK-2371 URL: https://issues.apache.org/jira/browse/FLINK-2371 Project: Flink Issue Type: Bug Components: Tests Reporter: Matthias J. Sax Assignee: Maximilian Michels AccumulatorLiveITCase fails regularly (however, not in each run). The tests relies on timing (via sleep) which does not work well on Travis. See dev-list for more details: https://mail-archives.apache.org/mod_mbox/flink-dev/201507.mbox/browser AccumulatorLiveITCase.testProgram:106-access$1100:68-checkFlinkAccumulators:189 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2382) Live Metric Reporting Does Not Work for Two-Input StreamTasks
[ https://issues.apache.org/jira/browse/FLINK-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-2382: - Assignee: Maximilian Michels Live Metric Reporting Does Not Work for Two-Input StreamTasks - Key: FLINK-2382 URL: https://issues.apache.org/jira/browse/FLINK-2382 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Aljoscha Krettek Assignee: Maximilian Michels Also, there are no tests for the live metrics in streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/925 [FLINK-2371] improve AccumulatorLiveITCase Instead of using Thread.sleep() to synchronize the checks of the accumulator values, we rely on message passing here to synchronize the task process. Therefore, we let the task process signal to the task manager that it has updated its accumulator values. The task manager lets the job manager know and sends out the heartbeat which contains the accumulators. When the job manager receives the accumulators and has been notified previously, it sends a message to the subscribed test case with the current accumulators. This assures that all processes are always synchronized correctly and we can verify the live accumulator results correctly. In the course of rewriting the test, I had to change two things in the implementation: a) User accumulators are now immediately serialized as well. Otherwise, Akka does not serialize in local one VM setups and passes the live accumulator map through. b) The asynchronous update of the accumulators can be disabled for tests. This was necessary because we cannot guarantee when the Future for updating the accumulators is executed. In real setups this is neglectable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink live-accumulators Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #925 commit 44687e783065a4157d2d3a695d9e94070ca6e8cd Author: Maximilian Michels m...@apache.org Date: 2015-07-20T09:55:11Z [FLINK-2371] improve AccumulatorLiveITCase Instead of using Thread.sleep() to synchronize the checks of the accumulator values, we rely on message passing here to synchronize the task process. Therefore, we let the task process signal to the task manager that it has updated its accumulator values. The task manager lets the job manager know and sends out the heartbeat which contains the accumulators. When the job manager receives the accumulators and has been notified previously, it sends a message to the subscribed test case with the current accumulators. This assures that all processes are always synchronized correctly and we can verify the live accumulator results correctly. In the course of rewriting the test, I had to change two things in the implementation: a) User accumulators are now immediately serialized as well. Otherwise, Akka does not serialize in local one VM setups and passes the live accumulator map through. b) The asynchronous update of the accumulators can be disabled for tests. This was necessary because we cannot guarantee when the Future for updating the accumulators is executed. In real setups this is neglectable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2310) Add an Adamic-Adar Similarity example
[ https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634123#comment-14634123 ] ASF GitHub Bot commented on FLINK-2310: --- Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-123051793 @andralungu @vasia PR has been updated to make the code more efficient. Add an Adamic-Adar Similarity example - Key: FLINK-2310 URL: https://issues.apache.org/jira/browse/FLINK-2310 Project: Flink Issue Type: Task Components: Gelly Reporter: Andra Lungu Assignee: Shivani Ghatge Priority: Minor Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a set of nodes. However, instead of counting the common neighbors and dividing them by the total number of neighbors, the similarity is weighted according to the vertex degrees. In particular, it's equal to log(1/numberOfEdges). The Adamic-Adar algorithm can be broken into three steps: 1). For each vertex, compute the log of its inverse degrees (with the formula above) and set it as the vertex value. 2). Each vertex will then send this new computed value along with a list of neighbors to the targets of its out-edges 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is the degree of node n). See [2] Prerequisites: - Full understanding of the Jaccard Similarity Measure algorithm - Reading the associated literature: [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf [2] http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/923#issuecomment-123051521 Updated PR @vasia I removed the Vertex Centric Iteration :) Hope this method is okay. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-123051793 @andralungu @vasia PR has been updated to make the code more efficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2375) Add Approximate Adamic Adar Similarity method using BloomFilters
[ https://issues.apache.org/jira/browse/FLINK-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634119#comment-14634119 ] ASF GitHub Bot commented on FLINK-2375: --- Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/923#issuecomment-123051521 Updated PR @vasia I removed the Vertex Centric Iteration :) Hope this method is okay. Add Approximate Adamic Adar Similarity method using BloomFilters Key: FLINK-2375 URL: https://issues.apache.org/jira/browse/FLINK-2375 Project: Flink Issue Type: Task Components: Gelly Reporter: Shivani Ghatge Assignee: Shivani Ghatge Priority: Minor Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a set of nodes. However, instead of counting the common neighbors and dividing them by the total number of neighbors, the similarity is weighted according to the vertex degrees. In particular, it's equal to log(1/numberOfEdges). The Adamic-Adar algorithm can be broken into three steps: 1). For each vertex, compute the log of its inverse degrees (with the formula above) and set it as the vertex value. 2). Each vertex will then send this new computed value along with a list of neighbors to the targets of its out-edges 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is the degree of node n). See [2] Using BloomFilters we increase the scalability of the algorithm. The values calculated for the edges will be approximate. Prerequisites: Full understanding of the Jaccard Similarity Measure algorithm Reading the associated literature: [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf [2] http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-123057981 The only problem with assuming NullValue if a value is missing is that we can't return NullValue in place of VV. I mean to say GraphK, VV, EV in this VV or EV can't be NullValue. otherwise that was what I was originally going for. Maybe since any of the other methods to create DataSet/Graph don't provide a method to give EdgeValue as NullValue and just expect the user to map it (at least that is what I saw), maybe we could just remove the functionality. I had only added it since many examples seemed to use it so I thought it would be nice to have that functionality. In any case we can just keep one typesNullEdge method too because if they don't want that, they can use normal overloaded types, 3 arguments for no NullValue, 2 arguments for null vertex and 1 argument for null vertex and edge and just one method named typesNullEdge to tell that only edges have NullValue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files
[ https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634142#comment-14634142 ] ASF GitHub Bot commented on FLINK-1520: --- Github user shghatge commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-123057981 The only problem with assuming NullValue if a value is missing is that we can't return NullValue in place of VV. I mean to say GraphK, VV, EV in this VV or EV can't be NullValue. otherwise that was what I was originally going for. Maybe since any of the other methods to create DataSet/Graph don't provide a method to give EdgeValue as NullValue and just expect the user to map it (at least that is what I saw), maybe we could just remove the functionality. I had only added it since many examples seemed to use it so I thought it would be nice to have that functionality. In any case we can just keep one typesNullEdge method too because if they don't want that, they can use normal overloaded types, 3 arguments for no NullValue, 2 arguments for null vertex and 1 argument for null vertex and edge and just one method named typesNullEdge to tell that only edges have NullValue. Read edges and vertices from CSV files -- Key: FLINK-1520 URL: https://issues.apache.org/jira/browse/FLINK-1520 Project: Flink Issue Type: New Feature Components: Gelly Reporter: Vasia Kalavri Assignee: Shivani Ghatge Priority: Minor Labels: easyfix, newbie Add methods to create Vertex and Edge Datasets directly from CSV file inputs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-685) Add support for semi-joins
[ https://issues.apache.org/jira/browse/FLINK-685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634358#comment-14634358 ] pietro pinoli commented on FLINK-685: - Hi [~fhueske], then if I implement the dummy version (your first alternative) does it have some chances to get merged ?:) Thanks again for your time. PP Add support for semi-joins -- Key: FLINK-685 URL: https://issues.apache.org/jira/browse/FLINK-685 Project: Flink Issue Type: New Feature Reporter: GitHub Import Priority: Minor Labels: github-import Fix For: pre-apache A semi-join is basically a join filter. One input is filtering and the other one is filtered. A tuple of the filtered input is emitted exactly once if the filtering input has one (ore more) tuples with matching join keys. That means that the output of a semi-join has the same type as the filtered input and the filtering input is completely discarded. In order to support a semi-join, we need to add an additional physical execution strategy, that ensures, that a tuple of the filtered input is emitted only once if the filtering input has more than one tuple with matching keys. Furthermore, a couple of optimizations compared to standard joins can be done such as storing only keys and not the full tuple of the filtering input in a hash table. Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/685 Created by: [fhueske|https://github.com/fhueske] Labels: enhancement, java api, runtime, Milestone: Release 0.6 (unplanned) Created at: Mon Apr 14 12:05:29 CEST 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-685) Add support for semi-joins
[ https://issues.apache.org/jira/browse/FLINK-685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] pietro pinoli reassigned FLINK-685: --- Assignee: pietro pinoli Add support for semi-joins -- Key: FLINK-685 URL: https://issues.apache.org/jira/browse/FLINK-685 Project: Flink Issue Type: New Feature Reporter: GitHub Import Assignee: pietro pinoli Priority: Minor Labels: github-import Fix For: pre-apache A semi-join is basically a join filter. One input is filtering and the other one is filtered. A tuple of the filtered input is emitted exactly once if the filtering input has one (ore more) tuples with matching join keys. That means that the output of a semi-join has the same type as the filtered input and the filtering input is completely discarded. In order to support a semi-join, we need to add an additional physical execution strategy, that ensures, that a tuple of the filtered input is emitted only once if the filtering input has more than one tuple with matching keys. Furthermore, a couple of optimizations compared to standard joins can be done such as storing only keys and not the full tuple of the filtering input in a hash table. Imported from GitHub Url: https://github.com/stratosphere/stratosphere/issues/685 Created by: [fhueske|https://github.com/fhueske] Labels: enhancement, java api, runtime, Milestone: Release 0.6 (unplanned) Created at: Mon Apr 14 12:05:29 CEST 2014 State: open -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-123153943 Hi @shghatge , Did you also run the two examples on the cluster to make sure that the approximate version is faster? Then you could also add some numbers to the two PRs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2310) Add an Adamic-Adar Similarity example
[ https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634516#comment-14634516 ] ASF GitHub Bot commented on FLINK-2310: --- Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-123153943 Hi @shghatge , Did you also run the two examples on the cluster to make sure that the approximate version is faster? Then you could also add some numbers to the two PRs. Add an Adamic-Adar Similarity example - Key: FLINK-2310 URL: https://issues.apache.org/jira/browse/FLINK-2310 Project: Flink Issue Type: Task Components: Gelly Reporter: Andra Lungu Assignee: Shivani Ghatge Priority: Minor Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a set of nodes. However, instead of counting the common neighbors and dividing them by the total number of neighbors, the similarity is weighted according to the vertex degrees. In particular, it's equal to log(1/numberOfEdges). The Adamic-Adar algorithm can be broken into three steps: 1). For each vertex, compute the log of its inverse degrees (with the formula above) and set it as the vertex value. 2). Each vertex will then send this new computed value along with a list of neighbors to the targets of its out-edges 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is the degree of node n). See [2] Prerequisites: - Full understanding of the Jaccard Similarity Measure algorithm - Reading the associated literature: [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf [2] http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar -- This message was sent by Atlassian JIRA (v6.3.4#6332)