[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686898#comment-15686898
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314, I should have clarified for documentation I meant 
apart from the docstrings you have added now, we also have to include 
documentation in the Flink 
[docs](https://github.com/apache/flink/tree/master/docs/dev/libs/ml) for each 
new addition.

See for example the docs for the [standard 
scaler](https://github.com/apache/flink/blob/master/docs/dev/libs/ml/standard_scaler.md).


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686760#comment-15686760
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @thvasilo @greghogan  

Ok I've updated documentation. I stay tuned for updating code. 

Regards
Thomas


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683519#comment-15683519
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314,

This PR is still missing documentation. After that is done a project 
committer will have to review it before it gets merged, which might take a 
while.

Regards,
Theodore


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683478#comment-15683478
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan @thvasilo What's the next step ? More tests and reviews ?


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15671741#comment-15671741
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan Ok I've pushed the code with my tests and some modifications in 
mapping

@thvasilo It seems to work perfectly! 


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667671#comment-15667671
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan Excuse my ignorance, I'm only now learning about Flink internals 
:)
It seems like the issue here was that `partitionByRange` partitions keys in 
ascending order but we want the end result in descending order.

@tfournier314 I think the following should work, here I use a key extractor 
to negate the value of the key to achieve the desired effect:

```Scala
itData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => -x._2) // Take the negative count as the key
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
```


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667423#comment-15667423
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2740
  
`zipWithIndex` preserves the order between partitions 
(DataSetUtils.java:121). @tfournier314, I don't think it's a problem pushing 
your current code since we're still discussing the PR.


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15666809#comment-15666809
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314 I tested your code and it does seem that partitions are 
sorted
only internally, which is expected and `zipWithIndex` is AFAIK unaware of 
the sorted (as in key range) order of partitions, so it's not guaranteed that 
the "first" partition will get the `[0, partitionSize-1]` indices, the second 
`[partitionSize, 2*partitionSize]` etc. Maybe @greghogan knows a solution for 
global sorting?

If it's not possible I think we can take a step back and see what we are 
trying to achieve here.

The task is to count the frequencies of labels and assign integer ids to 
them in order of frequency. The labels should either be categorical variables 
(e.g. {Male, Female, Uknown}) or class labels. The case with the most unique 
values might be vocabulary words, which will range in the few million unique 
values at worst.

I would argue then than after we have performed the frequency count in a 
distributed manner there is no need to do the last step which is assigning 
ordered indices in a distributed manner as well, we can make the assumption 
that all the (label -> frequency) values should fit into the memory of one 
machine.

So I would recommend to gather all data into one partition after getting 
the counts, that way we guarantee a global ordering:

```{Scala}
fitData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => 0)
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
  .print()
```

Of course we would need to clarify this restriction in the docstrings and 
documentation.


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665260#comment-15665260
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan I've not pushed the code yet because my tests are still 
incorrect.
Indeed the following code:

val env = ExecutionEnvironment.getExecutionEnvironment
val fitData = 
env.fromCollection(List("a","b","c","a","a","d","a","a","a","b","b","c","a","c","b","c","b"))
fitData.map(s => (s,1)).groupBy(0)
  .reduce((a,b) => (a._1, a._2 + b._2))
  .partitionByRange(1)
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
  .print()

returns 

(0,(b,5))
(1,(c,4))
(2,(d,1))
(3,(a,7))

And I would like the following:

(1,(b,5))
(2,(c,4))
(3,(d,1))
(0,(a,7))

Even if the order inside partitions is preserved (with mapPartitions), the 
order between partitions is not right ?




> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15664652#comment-15664652
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r87863825
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,108 @@
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * String Indexer
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[DataSet[(String, 
Long)]] = None
+
+
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 
==
+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
=
+
+  /**
+* Trains [[StringIndexer]] by learning the count of each string in the 
input DataSet.
+*/
+
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap, input: 
DataSet[String]):Unit ={
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  private def extractIndices(input: DataSet[String]): DataSet[(String, 
Long)] = {
+
+val mapping = input
+  .mapWith( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .partitionByRange( 1 )
--- End diff --

`zipWithIndex` is implemented with `mapPartition` which does not change the 
partitioning or ordering. You've referenced the correct implementation but the 
code is still missing the `sortPartition(1, Order.DESCENDING)`.


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15664651#comment-15664651
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r87860878
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,108 @@
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * String Indexer
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[DataSet[(String, 
Long)]] = None
+
+
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 
==
+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
=
+
+  /**
+* Trains [[StringIndexer]] by learning the count of each string in the 
input DataSet.
+*/
+
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap, input: 
DataSet[String]):Unit ={
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  private def extractIndices(input: DataSet[String]): DataSet[(String, 
Long)] = {
+
+val mapping = input
+  .mapWith( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
--- End diff --

Can this be replaced with `.sum(1)`?


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652290#comment-15652290
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r87295380
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,108 @@
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * String Indexer
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[DataSet[(String, 
Long)]] = None
+
+
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 
==
+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
=
+
+  /**
+* Trains [[StringIndexer]] by learning the count of each string in the 
input DataSet.
+*/
+
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap, input: 
DataSet[String]):Unit ={
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  private def extractIndices(input: DataSet[String]): DataSet[(String, 
Long)] = {
+
+val mapping = input
+  .mapWith( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .partitionByRange( 1 )
--- End diff --

Indeed, I need to do a global sort, because mapping is a sorted 
DataSet[(String,Long)] of (labels,index), where the most frequent item has 
index = 0.

I need to sort a dataSet of (labels,frequency), then zipWithIndex to get 
the associated index.

I've just realised that sortPartition() will only sort my partitions 
locally, so how can I achieve a global sort this way ? 





> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650936#comment-15650936
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r87192044
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,108 @@
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.extensions.acceptPartialFunctions
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * String Indexer
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[DataSet[(String, 
Long)]] = None
+
+
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 
==
+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
=
+
+  /**
+* Trains [[StringIndexer]] by learning the count of each string in the 
input DataSet.
+*/
+
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap, input: 
DataSet[String]):Unit ={
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  private def extractIndices(input: DataSet[String]): DataSet[(String, 
Long)] = {
+
+val mapping = input
+  .mapWith( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .partitionByRange( 1 )
--- End diff --

Could you explain what is the mapping doing here? If you are trying to sort 
shouldn't you be using `.sortPartition()` after the partition?


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650787#comment-15650787
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
@thvasilo @greghogan I've updated my code so that I'm streaming instead of 
caching with a collect(). Does it seem ok for you ?


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636993#comment-15636993
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
I've changed my code so that I have now mapping:DataSet[(String,Long)]

val mapping = input
  .mapWith( s => (s, 1) )
  .groupBy( 0 )
  .reduce( (a, b) => (a._1, a._2 + b._2) )
  .partitionByRange( 1 )
  .zipWithIndex
  .mapWith { case (id, (label, count)) => (label, id) }

Parsing a new DataSet[String] called rawInput, I'd like to use this mapping 
and associate each "label" of rawInput an ID (which is the Long value of 
mapping).

Is it possible with a streaming approach (need a join for example) ? 



> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633603#comment-15633603
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r86402282
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where 
each label is
+  * associated with an index.The indices are in [0,numLabels) and are 
ordered by label
+  * frequencies. The most frequent label has index 0.
+  *
+  * @example
+  * {{{
+  *val trainingDS: DataSet[String] = 
env.fromCollection(data)
+  *val transformer = 
StringIndexer().setHandleInvalid("skip")
+  *
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
+  * }}}
+  *
+  *
+  * You can manage unseen labels using HandleInvalid parameter. If 
HandleInvalid is
+  * set to "skip" (see example),then each line containing an unseen label 
is skipped.
+  * Otherwise an exception is raised.
+  *
+  * =Parameters=
+  *
+  * -[[HandleInvalid]]: Define how to handle unseen labels: by default is 
"skip"
+  *
+  *
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[Map[String, Int]] = None
+
+
+  /**
+* Set the value to handle unseen labels
+* @param value set to "skip" if you want to filter line with unseen 
labels
+* @return StringIndexer instance with HandleInvalid value
+*/
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 

+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
===
+
+  /**
+*  Trains [[StringIndexer]] by learning the count of each labels in 
the input DataSet.
+*
+* @return [[FitOperation]] training the [[StringIndexer]] on string 
labels
+*/
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap,
+input: DataSet[String]): Unit = {
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  /**
+* Count the frequency of each label, sort them in a decreasing order 
and assign an index
+*
+* @param input input Dataset containing labels
+* @return a map that returns for each label (key) its index (value)
+*/
+  private def extractIndices(input: DataSet[String]): Map[String, Int] ={
+
+implicit val resultTypeInformation = createTypeInformation[(String, 
Int)]
+
+val mapper = input
+  .map( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .collect( )
+  

[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633599#comment-15633599
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r86402178
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where 
each label is
+  * associated with an index.The indices are in [0,numLabels) and are 
ordered by label
+  * frequencies. The most frequent label has index 0.
+  *
+  * @example
+  * {{{
+  *val trainingDS: DataSet[String] = 
env.fromCollection(data)
+  *val transformer = 
StringIndexer().setHandleInvalid("skip")
+  *
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
+  * }}}
+  *
+  *
+  * You can manage unseen labels using HandleInvalid parameter. If 
HandleInvalid is
+  * set to "skip" (see example),then each line containing an unseen label 
is skipped.
+  * Otherwise an exception is raised.
+  *
+  * =Parameters=
+  *
+  * -[[HandleInvalid]]: Define how to handle unseen labels: by default is 
"skip"
+  *
+  *
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[Map[String, Int]] = None
+
+
+  /**
+* Set the value to handle unseen labels
+* @param value set to "skip" if you want to filter line with unseen 
labels
+* @return StringIndexer instance with HandleInvalid value
+*/
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 

+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
===
+
+  /**
+*  Trains [[StringIndexer]] by learning the count of each labels in 
the input DataSet.
+*
+* @return [[FitOperation]] training the [[StringIndexer]] on string 
labels
+*/
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap,
+input: DataSet[String]): Unit = {
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  /**
+* Count the frequency of each label, sort them in a decreasing order 
and assign an index
+*
+* @param input input Dataset containing labels
+* @return a map that returns for each label (key) its index (value)
+*/
+  private def extractIndices(input: DataSet[String]): Map[String, Int] ={
+
+implicit val resultTypeInformation = createTypeInformation[(String, 
Int)]
+
+val mapper = input
+  .map( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .collect( )
--- End 

[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633463#comment-15633463
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r86391698
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where 
each label is
+  * associated with an index.The indices are in [0,numLabels) and are 
ordered by label
+  * frequencies. The most frequent label has index 0.
+  *
+  * @example
+  * {{{
+  *val trainingDS: DataSet[String] = 
env.fromCollection(data)
+  *val transformer = 
StringIndexer().setHandleInvalid("skip")
+  *
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
+  * }}}
+  *
+  *
+  * You can manage unseen labels using HandleInvalid parameter. If 
HandleInvalid is
+  * set to "skip" (see example),then each line containing an unseen label 
is skipped.
+  * Otherwise an exception is raised.
+  *
+  * =Parameters=
+  *
+  * -[[HandleInvalid]]: Define how to handle unseen labels: by default is 
"skip"
+  *
+  *
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[Map[String, Int]] = None
+
+
+  /**
+* Set the value to handle unseen labels
+* @param value set to "skip" if you want to filter line with unseen 
labels
+* @return StringIndexer instance with HandleInvalid value
+*/
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 

+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
===
+
+  /**
+*  Trains [[StringIndexer]] by learning the count of each labels in 
the input DataSet.
+*
+* @return [[FitOperation]] training the [[StringIndexer]] on string 
labels
+*/
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap,
+input: DataSet[String]): Unit = {
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  /**
+* Count the frequency of each label, sort them in a decreasing order 
and assign an index
+*
+* @param input input Dataset containing labels
+* @return a map that returns for each label (key) its index (value)
+*/
+  private def extractIndices(input: DataSet[String]): Map[String, Int] ={
+
+implicit val resultTypeInformation = createTypeInformation[(String, 
Int)]
+
+val mapper = input
+  .map( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .collect( )
+  

[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633464#comment-15633464
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r86391230
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where 
each label is
+  * associated with an index.The indices are in [0,numLabels) and are 
ordered by label
+  * frequencies. The most frequent label has index 0.
+  *
+  * @example
+  * {{{
+  *val trainingDS: DataSet[String] = 
env.fromCollection(data)
+  *val transformer = 
StringIndexer().setHandleInvalid("skip")
+  *
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
+  * }}}
+  *
+  *
+  * You can manage unseen labels using HandleInvalid parameter. If 
HandleInvalid is
+  * set to "skip" (see example),then each line containing an unseen label 
is skipped.
+  * Otherwise an exception is raised.
+  *
+  * =Parameters=
+  *
+  * -[[HandleInvalid]]: Define how to handle unseen labels: by default is 
"skip"
+  *
+  *
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[Map[String, Int]] = None
+
+
+  /**
+* Set the value to handle unseen labels
+* @param value set to "skip" if you want to filter line with unseen 
labels
+* @return StringIndexer instance with HandleInvalid value
+*/
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 

+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
===
+
+  /**
+*  Trains [[StringIndexer]] by learning the count of each labels in 
the input DataSet.
+*
+* @return [[FitOperation]] training the [[StringIndexer]] on string 
labels
+*/
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap,
+input: DataSet[String]): Unit = {
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  /**
+* Count the frequency of each label, sort them in a decreasing order 
and assign an index
+*
+* @param input input Dataset containing labels
+* @return a map that returns for each label (key) its index (value)
+*/
+  private def extractIndices(input: DataSet[String]): Map[String, Int] ={
+
+implicit val resultTypeInformation = createTypeInformation[(String, 
Int)]
+
+val mapper = input
+  .map( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .collect( )
--- End diff 

[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633465#comment-15633465
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2740#discussion_r86392126
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
+import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
+
+import scala.collection.immutable.Seq
+
+/**
+  * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where 
each label is
+  * associated with an index.The indices are in [0,numLabels) and are 
ordered by label
+  * frequencies. The most frequent label has index 0.
+  *
+  * @example
+  * {{{
+  *val trainingDS: DataSet[String] = 
env.fromCollection(data)
+  *val transformer = 
StringIndexer().setHandleInvalid("skip")
+  *
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
+  * }}}
+  *
+  *
+  * You can manage unseen labels using HandleInvalid parameter. If 
HandleInvalid is
+  * set to "skip" (see example),then each line containing an unseen label 
is skipped.
+  * Otherwise an exception is raised.
+  *
+  * =Parameters=
+  *
+  * -[[HandleInvalid]]: Define how to handle unseen labels: by default is 
"skip"
+  *
+  *
+  */
+class StringIndexer extends Transformer[StringIndexer] {
+
+  private[preprocessing] var metricsOption: Option[Map[String, Int]] = None
+
+
+  /**
+* Set the value to handle unseen labels
+* @param value set to "skip" if you want to filter line with unseen 
labels
+* @return StringIndexer instance with HandleInvalid value
+*/
+  def setHandleInvalid(value: String): this.type ={
+parameters.add( HandleInvalid, value )
+this
+  }
+
+}
+
+object StringIndexer {
+
+  case object HandleInvalid extends Parameter[String] {
+val defaultValue: Option[String] = Some( "skip" )
+  }
+
+  //  Factory methods 

+
+  def apply(): StringIndexer ={
+new StringIndexer( )
+  }
+
+  // == Operations 
===
+
+  /**
+*  Trains [[StringIndexer]] by learning the count of each labels in 
the input DataSet.
+*
+* @return [[FitOperation]] training the [[StringIndexer]] on string 
labels
+*/
+  implicit def fitStringIndexer ={
+new FitOperation[StringIndexer, String] {
+  def fit(instance: StringIndexer, fitParameters: ParameterMap,
+input: DataSet[String]): Unit = {
+val metrics = extractIndices( input )
+instance.metricsOption = Some( metrics )
+  }
+}
+  }
+
+  /**
+* Count the frequency of each label, sort them in a decreasing order 
and assign an index
+*
+* @param input input Dataset containing labels
+* @return a map that returns for each label (key) its index (value)
+*/
+  private def extractIndices(input: DataSet[String]): Map[String, Int] ={
+
+implicit val resultTypeInformation = createTypeInformation[(String, 
Int)]
+
+val mapper = input
+  .map( s => (s, 1) )
+  .groupBy( 0 )
+  .reduce( (a, b) => (a._1, a._2 + b._2) )
+  .collect( )
+  

[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer

2016-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630729#comment-15630729
 ] 

ASF GitHub Bot commented on FLINK-4964:
---

Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
Yes, I've just updated the PR title


> FlinkML - Add StringIndexer
> ---
>
> Key: FLINK-4964
> URL: https://issues.apache.org/jira/browse/FLINK-4964
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)