[jira] [Commented] (FLINK-2075) Shade akka and protobuf dependencies away

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633114#comment-14633114
 ] 

ASF GitHub Bot commented on FLINK-2075:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122784993
  
I think you've referenced the wrong jira ticket.


> Shade akka and protobuf dependencies away
> -
>
> Key: FLINK-2075
> URL: https://issues.apache.org/jira/browse/FLINK-2075
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Till Rohrmann
> Fix For: 0.9
>
>
> Lately, the Zeppelin project encountered the following problem: It includes 
> flink-runtime which depends on akka_remote:2.3.7 which again depends on 
> protobuf-java:2.5.0. However, Zeppelin set the protobuf-java version to 2.4.1 
> to make it build with YARN 2.2. Due to this, akka_remote finds a wrong 
> protobuf-java version and fails because of an incompatible change between 
> these versions.
> I propose to shade Flink's akka dependency and protobuf dependency away, so 
> that user projects depending on Flink are not forced to use a special 
> akka/protobuf version.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2075] Add Approximate Adamic Adar Simil...

2015-07-20 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122784993
  
I think you've referenced the wrong jira ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2312) Random Splits

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633116#comment-14633116
 ] 

ASF GitHub Bot commented on FLINK-2312:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/921#issuecomment-122785194
  
I agree, a generalized implementation would be favorable.


> Random Splits
> -
>
> Key: FLINK-2312
> URL: https://issues.apache.org/jira/browse/FLINK-2312
> Project: Flink
>  Issue Type: Wish
>  Components: Machine Learning Library
>Reporter: Maximilian Alber
>Assignee: pietro pinoli
>Priority: Minor
>
> In machine learning applications it is common to split data sets into f.e. 
> training and testing set.
> To the best of my knowledge there is at the moment no nice way in Flink to 
> split a data set randomly into several partitions according to some ratio.
> The wished semantic would be the same as of Sparks RDD randomSplit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2312][ml][WIP] Randomly Splitting a Dat...

2015-07-20 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/921#issuecomment-122785194
  
I agree, a generalized implementation would be favorable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2312) Random Splits

2015-07-20 Thread Maximilian Alber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633118#comment-14633118
 ] 

Maximilian Alber commented on FLINK-2312:
-

I agree too.

Something else: How do you ensure the ratios? As I see they are only
approximately ensured when you have a big number of samples.

On Mon, Jul 20, 2015 at 9:26 AM, ASF GitHub Bot (JIRA) 



> Random Splits
> -
>
> Key: FLINK-2312
> URL: https://issues.apache.org/jira/browse/FLINK-2312
> Project: Flink
>  Issue Type: Wish
>  Components: Machine Learning Library
>Reporter: Maximilian Alber
>Assignee: pietro pinoli
>Priority: Minor
>
> In machine learning applications it is common to split data sets into f.e. 
> training and testing set.
> To the best of my knowledge there is at the moment no nice way in Flink to 
> split a data set randomly into several partitions according to some ratio.
> The wished semantic would be the same as of Sparks RDD randomSplit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2379) Add methods to evaluate field wise statistics over DataSet of vectors.

2015-07-20 Thread Sachin Goel (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Goel updated FLINK-2379:
---
Description: 
Design methods to evaluate statistics over dataset of vectors.
For continuous fields, Minimum, maximum, mean, variance.
For discrete fields, Class counts, Entropy, Gini Impurity.

Further statistical measures can also be supported. For example, correlation 
between two series, computing the covariance matrix, etc. 
[These are currently the things Spark supports.]

  was:
Design methods to evaluate statistics over dataset of vectors.
For continuous fields, Minimum, maximum, mean, variance.
For discrete fields, Class counts, Entropy, Gini Impurity.

 Issue Type: New Feature  (was: Bug)

> Add methods to evaluate field wise statistics over DataSet of vectors.
> --
>
> Key: FLINK-2379
> URL: https://issues.apache.org/jira/browse/FLINK-2379
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Sachin Goel
>
> Design methods to evaluate statistics over dataset of vectors.
> For continuous fields, Minimum, maximum, mean, variance.
> For discrete fields, Class counts, Entropy, Gini Impurity.
> Further statistical measures can also be supported. For example, correlation 
> between two series, computing the covariance matrix, etc. 
> [These are currently the things Spark supports.]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...

2015-07-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34973140
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P <: Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =>
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce->union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) => 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =>
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

Given that `input` is deterministic, this code will produce mutually 
exclusive validation and training data sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633130#comment-14633130
 ] 

ASF GitHub Bot commented on FLINK-1723:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34973140
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P <: Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =>
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce->union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) => 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =>
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

Given that `input` is deterministic, this code will produce mutually 
exclusive validation and training data sets.


> Add cross validation for model evaluation
> -
>
> Key: FLINK-1723
> URL: https://issues.apache.org/jira/browse/FLINK-1723
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>A

[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...

2015-07-20 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34973783
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P <: Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =>
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce->union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) => 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =>
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

Ah yes. I see that now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633136#comment-14633136
 ] 

ASF GitHub Bot commented on FLINK-1723:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34973783
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P <: Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =>
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce->union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) => 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =>
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

Ah yes. I see that now.


> Add cross validation for model evaluation
> -
>
> Key: FLINK-1723
> URL: https://issues.apache.org/jira/browse/FLINK-1723
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Theodore Vasiloudis
>  Labels: ML
>
> Cross validation [1] is a stan

[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633150#comment-14633150
 ] 

ASF GitHub Bot commented on FLINK-2362:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/922#discussion_r34974826
  
--- Diff: docs/apis/dataset_transformations.md ---
@@ -924,6 +924,183 @@ Not supported.
 
 **Note:** Extending the set of supported aggregation functions is on our 
roadmap.
 
+### Distinct
+
+The Distinct transformation computes the DataSet of the distinct elements 
of the source DataSet.
+The following code removes from the DataSet the duplicate elements:
--- End diff --

I'd rephrase this sentence to
> The following code removes duplicate elements from the DataSet:


> distinct is missing in DataSet API documentation
> 
>
> Key: FLINK-2362
> URL: https://issues.apache.org/jira/browse/FLINK-2362
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Java API, Scala API
>Affects Versions: 0.9, 0.10
>Reporter: Fabian Hueske
>Assignee: pietro pinoli
> Fix For: 0.10, 0.9.1
>
>
> The DataSet transformation {{distinct}} is not described or listed in the 
> documentation. It is not contained in the DataSet API programming guide 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
>  and not in the DataSet Transformation 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...

2015-07-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/922#discussion_r34974826
  
--- Diff: docs/apis/dataset_transformations.md ---
@@ -924,6 +924,183 @@ Not supported.
 
 **Note:** Extending the set of supported aggregation functions is on our 
roadmap.
 
+### Distinct
+
+The Distinct transformation computes the DataSet of the distinct elements 
of the source DataSet.
+The following code removes from the DataSet the duplicate elements:
--- End diff --

I'd rephrase this sentence to
> The following code removes duplicate elements from the DataSet:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...

2015-07-20 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/922#issuecomment-122805962
  
Thanks for improving the documentation. Looks good to merge for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633152#comment-14633152
 ] 

ASF GitHub Bot commented on FLINK-2362:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/922#issuecomment-122805962
  
Thanks for improving the documentation. Looks good to merge for me.


> distinct is missing in DataSet API documentation
> 
>
> Key: FLINK-2362
> URL: https://issues.apache.org/jira/browse/FLINK-2362
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Java API, Scala API
>Affects Versions: 0.9, 0.10
>Reporter: Fabian Hueske
>Assignee: pietro pinoli
> Fix For: 0.10, 0.9.1
>
>
> The DataSet transformation {{distinct}} is not described or listed in the 
> documentation. It is not contained in the DataSet API programming guide 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
>  and not in the DataSet Transformation 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2367) “flink-xx-jobmanager-linux-3lsu.log" file can't auto be recovered/detected after mistaking delete

2015-07-20 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633163#comment-14633163
 ] 

Maximilian Michels commented on FLINK-2367:
---

I'm afraid the logger is unaware of the deleted file. We would have to manually 
check if the file still exists but that seems to be a too much effort for such 
a corner case. However, we could add a note in the documentation that log file 
deletion during runtime is not supported.

> “flink-xx-jobmanager-linux-3lsu.log" file can't auto be recovered/detected 
> after mistaking delete
> -
>
> Key: FLINK-2367
> URL: https://issues.apache.org/jira/browse/FLINK-2367
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9
> Environment: Linux
>Reporter: chenliang
>Priority: Minor
>  Labels: reliability
> Fix For: 0.9.0
>
>
> For checking system whether be adequately reliability, testers usually 
> designedly do some delete operation.
> Steps:
> 1.go to "flink\build-target\log" 
> 2.delete “flink-xx-jobmanager-linux-3lsu.log" file 
> 3.Run jobs along with writing log info, meanwhile the system didn't give any 
> error info when the log info can't be wrote correctly.
> 4.when some jobs be run failed , go to check log file for finding the reason, 
> can't find the log file. 
> Must restart Job Manager to regenerate the log file, then continue to run 
> jobs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633162#comment-14633162
 ] 

Stephan Ewen commented on FLINK-2377:
-

Can you verify that this is a problem of the test, i.e., the test forgets to 
close the files?

> AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
> --
>
> Key: FLINK-2377
> URL: https://issues.apache.org/jira/browse/FLINK-2377
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: Windows
>Reporter: Gabor Gevay
>Priority: Minor
>
> This is probably another file closing issue. (that is, Windows won't delete 
> open files, as opposed to Linux)
> I have encountered two concrete tests so far where this actually appears: 
> CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2027) Flink website does not provide link to source repo

2015-07-20 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi resolved FLINK-2027.

Resolution: Fixed

I think this was reopened by accident.

> Flink website does not provide link to source repo
> --
>
> Key: FLINK-2027
> URL: https://issues.apache.org/jira/browse/FLINK-2027
> Project: Flink
>  Issue Type: Bug
>Reporter: Sebb
>Priority: Critical
>
> As the subject says - I could not find a link to the source repo anywhere 
> obvious on the website



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...

2015-07-20 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34975724
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P <: Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =>
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce->union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) => 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =>
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

However, in case the parallelism of data is more than one, this can lead to 
problem. The random number sequence generated on every node would be the same, 
wouldn't it?
I printed all the random numbers generated and it looks like this: 
https://gist.github.com/sachingoel0101/ecde269af996fba7a39a

Further, for a parallelism of 2, the test itself fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633165#comment-14633165
 ] 

ASF GitHub Bot commented on FLINK-1723:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34975724
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P <: Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =>
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce->union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) => 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =>
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

However, in case the parallelism of data is more than one, this can lead to 
problem. The random number sequence generated on every node would be the same, 
wouldn't it?
I printed all the random numbers generated and it looks like this: 
https://gist.github.com/sachingoel0101/ecde269af996fba7a39a

Further, for a parallelism of 2, the test itself fails.


> Add cross validation for model evaluation
> -
>
>  

[jira] [Commented] (FLINK-2299) The slot on which the task maanger was scheduled was killed

2015-07-20 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633166#comment-14633166
 ] 

Ufuk Celebi commented on FLINK-2299:


The FAQ is on the website repository flink-web 
(http://flink.apache.org/community.html#source-code).

> The slot on which the task maanger was scheduled was killed
> ---
>
> Key: FLINK-2299
> URL: https://issues.apache.org/jira/browse/FLINK-2299
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9, 0.10
>Reporter: Andra Lungu
>Priority: Critical
> Fix For: 0.9.1
>
>
> The following code: 
> https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
> Ran on the twitter follower graph: 
> http://twitter.mpi-sws.org/data-icwsm2010.html 
> With a similar configuration to the one in FLINK-2293
> fails with the following exception:
> java.lang.Exception: The slot in which the task was executed has been 
> released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ 
> wally025 - 8 slots - URL: 
> akka.tcp://flink@130.149.249.35:56135/user/taskmanager
> at 
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
> at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> at 
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
> at 
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 06/29/2015 10:33:46 Job execution switched to status FAILING.
> The logs are here:
> https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1723) Add cross validation for model evaluation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633184#comment-14633184
 ] 

ASF GitHub Bot commented on FLINK-1723:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34976620
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P <: Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =>
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce->union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) => 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =>
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

The test shouldn't fail. Maybe there is an error then.

What one could do to have different sequences on each node is to xor the 
subtask id with the seed. But IMO this does not change the statistical 
properties of the sample because we don't know the underlying order of the 
elements. E.g. the underlying order of the element could be that way that we 
obtain the same sample set as with an identical seed and a different order.


> Add cross validation for model e

[GitHub] flink pull request: [FLINK-1723] [ml] [WIP] Add cross validation f...

2015-07-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/891#discussion_r34976620
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/CrossValidation.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.evaluation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.RichDataSet
+import java.util.Random
+
+import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, 
FitOperation, Predictor}
+
+object CrossValidation {
+  def crossValScore[P <: Predictor[P], T](
+  predictor: P,
+  data: DataSet[T],
+  scorerOption: Option[Scorer] = None,
+  cv: FoldGenerator = KFold(),
+  seed: Long = new Random().nextLong())(implicit fitOperation: 
FitOperation[P, T],
+  evaluateDataSetOperation: EvaluateDataSetOperation[P, T, Double]): 
Array[DataSet[Double]] = {
+val folds = cv.folds(data, 1)
+
+val scores = folds.map {
+  case (training: DataSet[T], testing: DataSet[T]) =>
+predictor.fit(training)
+if (scorerOption.isEmpty) {
+  predictor.score(testing)
+} else {
+  val s = scorerOption.get
+  s.evaluate(testing, predictor)
+}
+}
+// TODO: Undecided on the return type: Array[DS[Double]] or DS[Double] 
i.e. reduce->union?
+// Or: Return mean and std?
+scores//.reduce((right: DataSet[Double], left: DataSet[Double]) => 
left.union(right)).mean()
+  }
+}
+
+abstract class FoldGenerator {
+
+  /** Takes a DataSet as input and creates splits (folds) of the data into
+* (training, testing) pairs.
+*
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])]
+}
+
+class KFold(numFolds: Int) extends FoldGenerator{
+
+  /** Takes a DataSet as input and creates K splits (folds) of the data 
into non-overlapping
+* (training, testing) pairs.
+*
+* Code based on Apache Spark implementation
+* @param input The DataSet that will be split into folds
+* @param seed Seed for replicable splitting of the data
+* @tparam T The type of the DataSet
+* @return An Array containing K (training, testing) tuples, where 
training and testing are
+* DataSets
+*/
+  override def folds[T](
+  input: DataSet[T],
+  seed: Long = new Random().nextLong()): Array[(DataSet[T], 
DataSet[T])] = {
+val numFoldsF = numFolds.toFloat
+(1 to numFolds).map { fold =>
+  val lb = (fold - 1) / numFoldsF
+  val ub = fold / numFoldsF
+  val validation = input.sampleBounded(lb, ub, complement = false, 
seed = seed)
+  val training = input.sampleBounded(lb, ub, complement = true, seed = 
seed)
+  (training, validation)
--- End diff --

The test shouldn't fail. Maybe there is an error then.

What one could do to have different sequences on each node is to xor the 
subtask id with the seed. But IMO this does not change the statistical 
properties of the sample because we don't know the underlying order of the 
elements. E.g. the underlying order of the element could be that way that we 
obtain the same sample set as with an identical seed and a different order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file

[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...

2015-07-20 Thread pp86
Github user pp86 commented on the pull request:

https://github.com/apache/flink/pull/922#issuecomment-122815918
  
Thanks for the suggestion @mxm , I updated the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633189#comment-14633189
 ] 

ASF GitHub Bot commented on FLINK-2362:
---

Github user pp86 commented on the pull request:

https://github.com/apache/flink/pull/922#issuecomment-122815918
  
Thanks for the suggestion @mxm , I updated the documentation.


> distinct is missing in DataSet API documentation
> 
>
> Key: FLINK-2362
> URL: https://issues.apache.org/jira/browse/FLINK-2362
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Java API, Scala API
>Affects Versions: 0.9, 0.10
>Reporter: Fabian Hueske
>Assignee: pietro pinoli
> Fix For: 0.10, 0.9.1
>
>
> The DataSet transformation {{distinct}} is not described or listed in the 
> documentation. It is not contained in the DataSet API programming guide 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
>  and not in the DataSet Transformation 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2380) Allow to configure default FS for file inputs

2015-07-20 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2380:
--

 Summary: Allow to configure default FS for file inputs
 Key: FLINK-2380
 URL: https://issues.apache.org/jira/browse/FLINK-2380
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
Priority: Minor
 Fix For: 0.10


File inputs use "file://" as default prefix. A user asked to make this 
configurable, e.g. "hdfs://" as default.

(I'm not sure whether this is already possible or not. I will check and either 
close or implement this for the user.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2299) The slot on which the task maanger was scheduled was killed

2015-07-20 Thread Andra Lungu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andra Lungu reassigned FLINK-2299:
--

Assignee: Andra Lungu

> The slot on which the task maanger was scheduled was killed
> ---
>
> Key: FLINK-2299
> URL: https://issues.apache.org/jira/browse/FLINK-2299
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9, 0.10
>Reporter: Andra Lungu
>Assignee: Andra Lungu
>Priority: Critical
> Fix For: 0.9.1
>
>
> The following code: 
> https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
> Ran on the twitter follower graph: 
> http://twitter.mpi-sws.org/data-icwsm2010.html 
> With a similar configuration to the one in FLINK-2293
> fails with the following exception:
> java.lang.Exception: The slot in which the task was executed has been 
> released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ 
> wally025 - 8 slots - URL: 
> akka.tcp://flink@130.149.249.35:56135/user/taskmanager
> at 
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
> at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> at 
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
> at 
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 06/29/2015 10:33:46 Job execution switched to status FAILING.
> The logs are here:
> https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2299) The slot on which the task maanger was scheduled was killed

2015-07-20 Thread Andra Lungu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633422#comment-14633422
 ] 

Andra Lungu commented on FLINK-2299:


Thanks, Ufuk! :)

> The slot on which the task maanger was scheduled was killed
> ---
>
> Key: FLINK-2299
> URL: https://issues.apache.org/jira/browse/FLINK-2299
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9, 0.10
>Reporter: Andra Lungu
>Assignee: Andra Lungu
>Priority: Critical
> Fix For: 0.9.1
>
>
> The following code: 
> https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
> Ran on the twitter follower graph: 
> http://twitter.mpi-sws.org/data-icwsm2010.html 
> With a similar configuration to the one in FLINK-2293
> fails with the following exception:
> java.lang.Exception: The slot in which the task was executed has been 
> released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ 
> wally025 - 8 slots - URL: 
> akka.tcp://flink@130.149.249.35:56135/user/taskmanager
> at 
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
> at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> at 
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
> at 
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 06/29/2015 10:33:46 Job execution switched to status FAILING.
> The logs are here:
> https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2299) The slot on which the task maanger was scheduled was killed

2015-07-20 Thread Andra Lungu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andra Lungu resolved FLINK-2299.

   Resolution: Fixed
Fix Version/s: (was: 0.9.1)
   0.10

> The slot on which the task maanger was scheduled was killed
> ---
>
> Key: FLINK-2299
> URL: https://issues.apache.org/jira/browse/FLINK-2299
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9, 0.10
>Reporter: Andra Lungu
>Assignee: Andra Lungu
>Priority: Critical
> Fix For: 0.10
>
>
> The following code: 
> https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/GSATriangleCount.java
> Ran on the twitter follower graph: 
> http://twitter.mpi-sws.org/data-icwsm2010.html 
> With a similar configuration to the one in FLINK-2293
> fails with the following exception:
> java.lang.Exception: The slot in which the task was executed has been 
> released. Probably loss of TaskManager 57c67d938c9144bec5ba798bb8ebe636 @ 
> wally025 - 8 slots - URL: 
> akka.tcp://flink@130.149.249.35:56135/user/taskmanager
> at 
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
> at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
> at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
> at 
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:154)
> at 
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:182)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:421)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 06/29/2015 10:33:46 Job execution switched to status FAILING.
> The logs are here:
> https://drive.google.com/file/d/0BwnaKJcSLc43M1BhNUt5NWdINHc/view?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633428#comment-14633428
 ] 

ASF GitHub Bot commented on FLINK-2362:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/922#discussion_r34984880
  
--- Diff: docs/apis/programming_guide.md ---
@@ -606,7 +606,17 @@ DataSet> output = 
input.sum(0).andMin(2);
   
 
 
+
+  Distinct
+  
+Returns the distinct elements of a data set.
--- End diff --

Could you add a sentence here to explain what "distinct" means?


> distinct is missing in DataSet API documentation
> 
>
> Key: FLINK-2362
> URL: https://issues.apache.org/jira/browse/FLINK-2362
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Java API, Scala API
>Affects Versions: 0.9, 0.10
>Reporter: Fabian Hueske
>Assignee: pietro pinoli
> Fix For: 0.10, 0.9.1
>
>
> The DataSet transformation {{distinct}} is not described or listed in the 
> documentation. It is not contained in the DataSet API programming guide 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
>  and not in the DataSet Transformation 
> (https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2362] - distinct is missing in DataSet ...

2015-07-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/922#discussion_r34984880
  
--- Diff: docs/apis/programming_guide.md ---
@@ -606,7 +606,17 @@ DataSet> output = 
input.sum(0).andMin(2);
   
 
 
+
+  Distinct
+  
+Returns the distinct elements of a data set.
--- End diff --

Could you add a sentence here to explain what "distinct" means?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2375) Add Approximate Adamic Adar Similarity method using BloomFilters

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633514#comment-14633514
 ] 

ASF GitHub Bot commented on FLINK-2375:
---

Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122868758
  
Could you also do a git ammend to reference the correct jira issue in the 
commit?


> Add Approximate Adamic Adar Similarity method using BloomFilters
> 
>
> Key: FLINK-2375
> URL: https://issues.apache.org/jira/browse/FLINK-2375
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Shivani Ghatge
>Assignee: Shivani Ghatge
>Priority: Minor
>
> Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
> set of nodes. However, instead of counting the common neighbors and dividing 
> them by the total number of neighbors, the similarity is weighted according 
> to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
> The Adamic-Adar algorithm can be broken into three steps:
> 1). For each vertex, compute the log of its inverse degrees (with the formula 
> above) and set it as the vertex value.
> 2). Each vertex will then send this new computed value along with a list of 
> neighbors to the targets of its out-edges
> 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
> log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
> the degree of node n). See [2]
> Using BloomFilters we increase the scalability of the algorithm. The values 
> calculated for the edges will be approximate.
> Prerequisites:
> Full understanding of the Jaccard Similarity Measure algorithm
> Reading the associated literature:
> [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
> [2] 
> http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...

2015-07-20 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-122868758
  
Could you also do a git ammend to reference the correct jira issue in the 
commit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633525#comment-14633525
 ] 

Gabor Gevay commented on FLINK-2377:


Yes, I have found the problem: TestBaseUtils.readAllResultLines is not closing 
the readers. I will open a PR that fixes this.

> AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
> --
>
> Key: FLINK-2377
> URL: https://issues.apache.org/jira/browse/FLINK-2377
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: Windows
>Reporter: Gabor Gevay
>Priority: Minor
>
> This is probably another file closing issue. (that is, Windows won't delete 
> open files, as opposed to Linux)
> I have encountered two concrete tests so far where this actually appears: 
> CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2377] Add reader.close() to readAllResu...

2015-07-20 Thread ggevay
GitHub user ggevay opened a pull request:

https://github.com/apache/flink/pull/924

[FLINK-2377] Add reader.close() to readAllResultLines



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ggevay/flink readAllResultLinesFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #924


commit 63b71d7c2364652030fdca360b9804c344da56a3
Author: Gabor Gevay 
Date:   2015-07-20T12:39:43Z

[FLINK-2377] Add reader.close() to readAllResultLines




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633531#comment-14633531
 ] 

ASF GitHub Bot commented on FLINK-2377:
---

GitHub user ggevay opened a pull request:

https://github.com/apache/flink/pull/924

[FLINK-2377] Add reader.close() to readAllResultLines



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ggevay/flink readAllResultLinesFix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #924


commit 63b71d7c2364652030fdca360b9804c344da56a3
Author: Gabor Gevay 
Date:   2015-07-20T12:39:43Z

[FLINK-2377] Add reader.close() to readAllResultLines




> AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
> --
>
> Key: FLINK-2377
> URL: https://issues.apache.org/jira/browse/FLINK-2377
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: Windows
>Reporter: Gabor Gevay
>Priority: Minor
>
> This is probably another file closing issue. (that is, Windows won't delete 
> open files, as opposed to Linux)
> I have encountered two concrete tests so far where this actually appears: 
> CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2381) Possible class not found Exception on failed partition producer

2015-07-20 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2381:
--

 Summary: Possible class not found Exception on failed partition 
producer
 Key: FLINK-2381
 URL: https://issues.apache.org/jira/browse/FLINK-2381
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
 Fix For: 0.10, 0.9.1


Failing the production of a result partition marks the respective partition as 
failed with a ProducerFailedException.

The cause of this exception can be a user defined class, which can only be 
loaded by the user code class loader. The network stack fails the shuffle with 
a RemoteTransportException, which has the user exception as a cause. When the 
consuming task receives this exception, this leads to a class not found 
exception, because the network stack tries to load the class with the system 
class loader.

{code}
+--+
| FAILING  |
| PRODUCER |
+--+
 || 
 \/
 ProducerFailedException(CAUSE) via network
 || 
 \/
+--+
| RECEIVER |
+--+
{code}

CAUSE is only loadable by the user code class loader.

When trying to deserialize this, RECEIVER fails with a LocalTransportException, 
which is super confusing, because the error is not local, but remote.

Thanks to [~rmetzger] for reporting and debugging the issue with the following 
stack trace:

{code}
Flat Map (26/120)


14:03:00,343 ERROR org.apache.flink.streaming.runtime.tasks.OneInputStreamTask  
 - Flat Map (26/120) failed
java.lang.RuntimeException: Could not read next record.
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:745)
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at 
io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
at 
io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
at 
io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
Caused by: io.netty.handler.codec.DecoderException: 
java.lang.ClassNotFoundException: kafka.common.ConsumerRebalanceFailedException
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
... 12 more
Caused by: java.lang.ClassNotFoundException: 
kafka.common.ConsumerRebalanceFailedException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.

[jira] [Assigned] (FLINK-2381) Possible class not found Exception on failed partition producer

2015-07-20 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi reassigned FLINK-2381:
--

Assignee: Ufuk Celebi

> Possible class not found Exception on failed partition producer
> ---
>
> Key: FLINK-2381
> URL: https://issues.apache.org/jira/browse/FLINK-2381
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Affects Versions: 0.9, master
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 0.10, 0.9.1
>
>
> Failing the production of a result partition marks the respective partition 
> as failed with a ProducerFailedException.
> The cause of this exception can be a user defined class, which can only be 
> loaded by the user code class loader. The network stack fails the shuffle 
> with a RemoteTransportException, which has the user exception as a cause. 
> When the consuming task receives this exception, this leads to a class not 
> found exception, because the network stack tries to load the class with the 
> system class loader.
> {code}
> +--+
> | FAILING  |
> | PRODUCER |
> +--+
>  || 
>  \/
>  ProducerFailedException(CAUSE) via network
>  || 
>  \/
> +--+
> | RECEIVER |
> +--+
> {code}
> CAUSE is only loadable by the user code class loader.
> When trying to deserialize this, RECEIVER fails with a 
> LocalTransportException, which is super confusing, because the error is not 
> local, but remote.
> Thanks to [~rmetzger] for reporting and debugging the issue with the 
> following stack trace:
> {code}
> Flat Map (26/120)
> 14:03:00,343 ERROR 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask   - Flat Map 
> (26/120) failed
> java.lang.RuntimeException: Could not read next record.
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:71)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> java.lang.ClassNotFoundException: 
> kafka.common.ConsumerRebalanceFailedException
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:151)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
> at 
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
> at 
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> ... 1 more
> Caused by: io.netty.handler.codec.DecoderException: 
> java.lang.ClassNotFoundException: 
> kafka.common.ConsumerRebalanceFailedException
> at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> ... 12 more
> Caused by: java.lang.ClassNotFoundException: 
> k

[GitHub] flink pull request: [FLINK-2377] Add reader.close() to readAllResu...

2015-07-20 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/924#issuecomment-122893574
  
Great catch! Looks good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2377) AbstractTestBase.deleteAllTempFiles sometimes fails on Windows

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633628#comment-14633628
 ] 

ASF GitHub Bot commented on FLINK-2377:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/924#issuecomment-122893574
  
Great catch! Looks good to merge.


> AbstractTestBase.deleteAllTempFiles sometimes fails on Windows
> --
>
> Key: FLINK-2377
> URL: https://issues.apache.org/jira/browse/FLINK-2377
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
> Environment: Windows
>Reporter: Gabor Gevay
>Priority: Minor
>
> This is probably another file closing issue. (that is, Windows won't delete 
> open files, as opposed to Linux)
> I have encountered two concrete tests so far where this actually appears: 
> CsvOutputFormatITCase and CollectionSourceTest.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2382) Live Metric Reporting Does Not Work for Two-Input StreamTasks

2015-07-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-2382:
---

 Summary: Live Metric Reporting Does Not Work for Two-Input 
StreamTasks
 Key: FLINK-2382
 URL: https://issues.apache.org/jira/browse/FLINK-2382
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Aljoscha Krettek


Also, there are no tests for the live metrics in streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2382) Live Metric Reporting Does Not Work for Two-Input StreamTasks

2015-07-20 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-2382:
-

Assignee: Maximilian Michels

> Live Metric Reporting Does Not Work for Two-Input StreamTasks
> -
>
> Key: FLINK-2382
> URL: https://issues.apache.org/jira/browse/FLINK-2382
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Aljoscha Krettek
>Assignee: Maximilian Michels
>
> Also, there are no tests for the live metrics in streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2371] improve AccumulatorLiveITCase

2015-07-20 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/925

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink live-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #925


commit 44687e783065a4157d2d3a695d9e94070ca6e8cd
Author: Maximilian Michels 
Date:   2015-07-20T09:55:11Z

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2371) AccumulatorLiveITCase fails

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633752#comment-14633752
 ] 

ASF GitHub Bot commented on FLINK-2371:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/925

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink live-accumulators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #925


commit 44687e783065a4157d2d3a695d9e94070ca6e8cd
Author: Maximilian Michels 
Date:   2015-07-20T09:55:11Z

[FLINK-2371] improve AccumulatorLiveITCase

Instead of using Thread.sleep() to synchronize the checks of the
accumulator values, we rely on message passing here to synchronize the
task process.

Therefore, we let the task process signal to the task manager that it
has updated its accumulator values. The task manager lets the job
manager know and sends out the heartbeat which contains the
accumulators. When the job manager receives the accumulators and has
been notified previously, it sends a message to the subscribed test case
with the current accumulators.

This assures that all processes are always synchronized correctly and we
can verify the live accumulator results correctly.

In the course of rewriting the test, I had to change two things in the
implementation:

a) User accumulators are now immediately serialized as well. Otherwise,
Akka does not serialize in local one VM setups and passes the live
accumulator map through.

b) The asynchronous update of the accumulators can be disabled for
tests. This was necessary because we cannot guarantee when the Future
for updating the accumulators is executed. In real setups this is
neglectable.




> AccumulatorLiveITCase fails
> ---
>
> Key: FLINK-2371
> URL: https://issues.apache.org/jira/browse/FLINK-2371
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Assignee: Maximilian Michels
>
> AccumulatorLiveITCase fails regularly (however, not in each run). The tests 
> relies on timing (via sleep) which does not work well on Travis.
> See dev-list for more details: 
> https://mail-archives.apache.org/mod_mbox/flink-dev/201507.mbox/browser
> AccumulatorLiveITCase.testProgram:106->access$1100:68->checkFlinkAccumulators:189



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1967] Introduce (Event)time in Streamin...

2015-07-20 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/906#issuecomment-122949512
  
Does anyone have any objections still? The impact of the timestamps is now 
completely disabled by default.

If there are no objections I would like to merge this by tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1967) Introduce (Event)time in Streaming

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633826#comment-14633826
 ] 

ASF GitHub Bot commented on FLINK-1967:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/906#issuecomment-122949512
  
Does anyone have any objections still? The impact of the timestamps is now 
completely disabled by default.

If there are no objections I would like to merge this by tomorrow.


> Introduce (Event)time in Streaming
> --
>
> Key: FLINK-1967
> URL: https://issues.apache.org/jira/browse/FLINK-1967
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the 
> sources to add timestamps to records. This will also introduce punctuations 
> (or low watermarks) to allow windows to work correctly on unordered, 
> timestamped input data. In the process of this, the windowing subsystem also 
> needs to be adapted to use the punctuations. Furthermore, all operators need 
> to be made aware of punctuations and correctly forward them. Then, a new 
> operator must be introduced to to allow modification of timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...

2015-07-20 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-123051521
  
Updated PR
@vasia I removed the Vertex Centric Iteration :) Hope this method is okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-20 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123051793
  
@andralungu @vasia PR has been updated to make the code more efficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2375) Add Approximate Adamic Adar Similarity method using BloomFilters

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634119#comment-14634119
 ] 

ASF GitHub Bot commented on FLINK-2375:
---

Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/923#issuecomment-123051521
  
Updated PR
@vasia I removed the Vertex Centric Iteration :) Hope this method is okay.


> Add Approximate Adamic Adar Similarity method using BloomFilters
> 
>
> Key: FLINK-2375
> URL: https://issues.apache.org/jira/browse/FLINK-2375
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Shivani Ghatge
>Assignee: Shivani Ghatge
>Priority: Minor
>
> Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
> set of nodes. However, instead of counting the common neighbors and dividing 
> them by the total number of neighbors, the similarity is weighted according 
> to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
> The Adamic-Adar algorithm can be broken into three steps:
> 1). For each vertex, compute the log of its inverse degrees (with the formula 
> above) and set it as the vertex value.
> 2). Each vertex will then send this new computed value along with a list of 
> neighbors to the targets of its out-edges
> 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
> log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
> the degree of node n). See [2]
> Using BloomFilters we increase the scalability of the algorithm. The values 
> calculated for the edges will be approximate.
> Prerequisites:
> Full understanding of the Jaccard Similarity Measure algorithm
> Reading the associated literature:
> [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
> [2] 
> http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2310) Add an Adamic-Adar Similarity example

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634123#comment-14634123
 ] 

ASF GitHub Bot commented on FLINK-2310:
---

Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123051793
  
@andralungu @vasia PR has been updated to make the code more efficient.


> Add an Adamic-Adar Similarity example
> -
>
> Key: FLINK-2310
> URL: https://issues.apache.org/jira/browse/FLINK-2310
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Andra Lungu
>Assignee: Shivani Ghatge
>Priority: Minor
>
> Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
> set of nodes. However, instead of counting the common neighbors and dividing 
> them by the total number of neighbors, the similarity is weighted according 
> to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
> The Adamic-Adar algorithm can be broken into three steps: 
> 1). For each vertex, compute the log of its inverse degrees (with the formula 
> above) and set it as the vertex value. 
> 2). Each vertex will then send this new computed value along with a list of 
> neighbors to the targets of its out-edges
> 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
> log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
> the degree of node n). See [2]
> Prerequisites: 
> - Full understanding of the Jaccard Similarity Measure algorithm
> - Reading the associated literature: 
> [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
> [2] 
> http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-07-20 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-123057981
  
The only problem with assuming NullValue if a value is missing is that we 
can't return NullValue in place of VV.
I mean to say Graph in this VV or EV can't be NullValue. 
otherwise that was what I was originally going for. 
Maybe since any of the other methods to create DataSet/Graph don't provide 
a method to give EdgeValue as NullValue and just expect the user to map it (at 
least that is what I saw), maybe we could just remove the functionality. I had 
only added it since many examples seemed to use it so I thought it would be 
nice to have that functionality. 
In any case we can just keep one typesNullEdge method too because if they 
don't want that, they can use normal overloaded types, 3 arguments for no 
NullValue, 2 arguments for null vertex and 1 argument for null vertex and edge 
and just one method named typesNullEdge to tell that only edges have NullValue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634142#comment-14634142
 ] 

ASF GitHub Bot commented on FLINK-1520:
---

Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-123057981
  
The only problem with assuming NullValue if a value is missing is that we 
can't return NullValue in place of VV.
I mean to say Graph in this VV or EV can't be NullValue. 
otherwise that was what I was originally going for. 
Maybe since any of the other methods to create DataSet/Graph don't provide 
a method to give EdgeValue as NullValue and just expect the user to map it (at 
least that is what I saw), maybe we could just remove the functionality. I had 
only added it since many examples seemed to use it so I thought it would be 
nice to have that functionality. 
In any case we can just keep one typesNullEdge method too because if they 
don't want that, they can use normal overloaded types, 3 arguments for no 
NullValue, 2 arguments for null vertex and 1 argument for null vertex and edge 
and just one method named typesNullEdge to tell that only edges have NullValue.


> Read edges and vertices from CSV files
> --
>
> Key: FLINK-1520
> URL: https://issues.apache.org/jira/browse/FLINK-1520
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Shivani Ghatge
>Priority: Minor
>  Labels: easyfix, newbie
>
> Add methods to create Vertex and Edge Datasets directly from CSV file inputs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-685) Add support for semi-joins

2015-07-20 Thread pietro pinoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634358#comment-14634358
 ] 

pietro pinoli commented on FLINK-685:
-

Hi [~fhueske],

then if I implement the dummy version  (your first alternative) does it have 
some chances to get merged ?:)

Thanks again for your time.
PP

> Add support for semi-joins
> --
>
> Key: FLINK-685
> URL: https://issues.apache.org/jira/browse/FLINK-685
> Project: Flink
>  Issue Type: New Feature
>Reporter: GitHub Import
>Priority: Minor
>  Labels: github-import
> Fix For: pre-apache
>
>
> A semi-join is basically a join filter. One input is "filtering" and the 
> other one is "filtered".
> A tuple of the "filtered" input is emitted exactly once if the "filtering" 
> input has one (ore more) tuples with matching join keys. That means that the 
> output of a semi-join has the same type as the "filtered" input and the 
> "filtering" input is completely discarded.
> In order to support a semi-join, we need to add an additional physical 
> execution strategy, that ensures, that a tuple of the "filtered" input is 
> emitted only once if the "filtering" input has more than one tuple with 
> matching keys. Furthermore, a couple of optimizations compared to standard 
> joins can be done such as storing only keys and not the full tuple of the 
> "filtering" input in a hash table.
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/685
> Created by: [fhueske|https://github.com/fhueske]
> Labels: enhancement, java api, runtime, 
> Milestone: Release 0.6 (unplanned)
> Created at: Mon Apr 14 12:05:29 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-685) Add support for semi-joins

2015-07-20 Thread pietro pinoli (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

pietro pinoli reassigned FLINK-685:
---

Assignee: pietro pinoli

> Add support for semi-joins
> --
>
> Key: FLINK-685
> URL: https://issues.apache.org/jira/browse/FLINK-685
> Project: Flink
>  Issue Type: New Feature
>Reporter: GitHub Import
>Assignee: pietro pinoli
>Priority: Minor
>  Labels: github-import
> Fix For: pre-apache
>
>
> A semi-join is basically a join filter. One input is "filtering" and the 
> other one is "filtered".
> A tuple of the "filtered" input is emitted exactly once if the "filtering" 
> input has one (ore more) tuples with matching join keys. That means that the 
> output of a semi-join has the same type as the "filtered" input and the 
> "filtering" input is completely discarded.
> In order to support a semi-join, we need to add an additional physical 
> execution strategy, that ensures, that a tuple of the "filtered" input is 
> emitted only once if the "filtering" input has more than one tuple with 
> matching keys. Furthermore, a couple of optimizations compared to standard 
> joins can be done such as storing only keys and not the full tuple of the 
> "filtering" input in a hash table.
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/685
> Created by: [fhueske|https://github.com/fhueske]
> Labels: enhancement, java api, runtime, 
> Milestone: Release 0.6 (unplanned)
> Created at: Mon Apr 14 12:05:29 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-07-20 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123153943
  
Hi @shghatge ,

Did you also run the two examples on the cluster to make sure that the 
approximate version is faster?
Then you could also add some numbers to the two PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2310) Add an Adamic-Adar Similarity example

2015-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634516#comment-14634516
 ] 

ASF GitHub Bot commented on FLINK-2310:
---

Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-123153943
  
Hi @shghatge ,

Did you also run the two examples on the cluster to make sure that the 
approximate version is faster?
Then you could also add some numbers to the two PRs.


> Add an Adamic-Adar Similarity example
> -
>
> Key: FLINK-2310
> URL: https://issues.apache.org/jira/browse/FLINK-2310
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Andra Lungu
>Assignee: Shivani Ghatge
>Priority: Minor
>
> Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
> set of nodes. However, instead of counting the common neighbors and dividing 
> them by the total number of neighbors, the similarity is weighted according 
> to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
> The Adamic-Adar algorithm can be broken into three steps: 
> 1). For each vertex, compute the log of its inverse degrees (with the formula 
> above) and set it as the vertex value. 
> 2). Each vertex will then send this new computed value along with a list of 
> neighbors to the targets of its out-edges
> 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
> log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
> the degree of node n). See [2]
> Prerequisites: 
> - Full understanding of the Jaccard Similarity Measure algorithm
> - Reading the associated literature: 
> [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
> [2] 
> http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-20 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634622#comment-14634622
 ] 

Chengxiang Li commented on FLINK-1901:
--

To randomly choose a sample from a DataSet S, basically, there exists two kinds 
of sample requirement: sampling with factor(such as "randomly choose 5% percent 
items in S") and sampling with fixed size(such as "randomly choose 100 items 
from S"). Besides, we do not know the size of S, unless we take extra cost to 
computer it through DataSet::count().
# Sampling with factor
#* With replacement: the expected sample size follow [Poisson 
Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, 
so Poisson Sampling can be used to choose the sample items.
#* Without replacement: during sampling, we can take the sample of each item in 
iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial].
# Sampling with fixed size
#* Use DataSet::count() to get the dataset size, with the fixed size, we can 
turn this into sampling with factor.
#* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is 
another commonly used algorithm to randomly choose a sample of k items from a 
list S containing n items, where n is either a very large or unknown number, 
and there are different reservoir sampling algorithms that support reservoir 
support both sampling with replacement and sampling without replacement.


> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-20 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634628#comment-14634628
 ] 

Chengxiang Li commented on FLINK-1901:
--

Hi, [~tvas], i would like to contribute on this issue if there is no others 
working on it now.

> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)