[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686898#comment-15686898 ] ASF GitHub Bot commented on FLINK-4964: --- Github user thvasilo commented on the issue: https://github.com/apache/flink/pull/2740 Hello @tfournier314, I should have clarified for documentation I meant apart from the docstrings you have added now, we also have to include documentation in the Flink [docs](https://github.com/apache/flink/tree/master/docs/dev/libs/ml) for each new addition. See for example the docs for the [standard scaler](https://github.com/apache/flink/blob/master/docs/dev/libs/ml/standard_scaler.md). > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15686760#comment-15686760 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 Hello @thvasilo @greghogan Ok I've updated documentation. I stay tuned for updating code. Regards Thomas > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683519#comment-15683519 ] ASF GitHub Bot commented on FLINK-4964: --- Github user thvasilo commented on the issue: https://github.com/apache/flink/pull/2740 Hello @tfournier314, This PR is still missing documentation. After that is done a project committer will have to review it before it gets merged, which might take a while. Regards, Theodore > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683478#comment-15683478 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 @greghogan @thvasilo What's the next step ? More tests and reviews ? > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15671741#comment-15671741 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 @greghogan Ok I've pushed the code with my tests and some modifications in mapping @thvasilo It seems to work perfectly! > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667671#comment-15667671 ] ASF GitHub Bot commented on FLINK-4964: --- Github user thvasilo commented on the issue: https://github.com/apache/flink/pull/2740 @greghogan Excuse my ignorance, I'm only now learning about Flink internals :) It seems like the issue here was that `partitionByRange` partitions keys in ascending order but we want the end result in descending order. @tfournier314 I think the following should work, here I use a key extractor to negate the value of the key to achieve the desired effect: ```Scala itData.map(s => (s,1)) .groupBy(0) .sum(1) .partitionByRange(x => -x._2) // Take the negative count as the key .sortPartition(1, Order.DESCENDING) .zipWithIndex ``` > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15667423#comment-15667423 ] ASF GitHub Bot commented on FLINK-4964: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2740 `zipWithIndex` preserves the order between partitions (DataSetUtils.java:121). @tfournier314, I don't think it's a problem pushing your current code since we're still discussing the PR. > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15666809#comment-15666809 ] ASF GitHub Bot commented on FLINK-4964: --- Github user thvasilo commented on the issue: https://github.com/apache/flink/pull/2740 Hello @tfournier314 I tested your code and it does seem that partitions are sorted only internally, which is expected and `zipWithIndex` is AFAIK unaware of the sorted (as in key range) order of partitions, so it's not guaranteed that the "first" partition will get the `[0, partitionSize-1]` indices, the second `[partitionSize, 2*partitionSize]` etc. Maybe @greghogan knows a solution for global sorting? If it's not possible I think we can take a step back and see what we are trying to achieve here. The task is to count the frequencies of labels and assign integer ids to them in order of frequency. The labels should either be categorical variables (e.g. {Male, Female, Uknown}) or class labels. The case with the most unique values might be vocabulary words, which will range in the few million unique values at worst. I would argue then than after we have performed the frequency count in a distributed manner there is no need to do the last step which is assigning ordered indices in a distributed manner as well, we can make the assumption that all the (label -> frequency) values should fit into the memory of one machine. So I would recommend to gather all data into one partition after getting the counts, that way we guarantee a global ordering: ```{Scala} fitData.map(s => (s,1)) .groupBy(0) .sum(1) .partitionByRange(x => 0) .sortPartition(1, Order.DESCENDING) .zipWithIndex .print() ``` Of course we would need to clarify this restriction in the docstrings and documentation. > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15665260#comment-15665260 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 @greghogan I've not pushed the code yet because my tests are still incorrect. Indeed the following code: val env = ExecutionEnvironment.getExecutionEnvironment val fitData = env.fromCollection(List("a","b","c","a","a","d","a","a","a","b","b","c","a","c","b","c","b")) fitData.map(s => (s,1)).groupBy(0) .reduce((a,b) => (a._1, a._2 + b._2)) .partitionByRange(1) .sortPartition(1, Order.DESCENDING) .zipWithIndex .print() returns (0,(b,5)) (1,(c,4)) (2,(d,1)) (3,(a,7)) And I would like the following: (1,(b,5)) (2,(c,4)) (3,(d,1)) (0,(a,7)) Even if the order inside partitions is preserved (with mapPartitions), the order between partitions is not right ? > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15664652#comment-15664652 ] ASF GitHub Bot commented on FLINK-4964: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r87863825 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,108 @@ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * String Indexer + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[DataSet[(String, Long)]] = None + + + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods == + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations = + + /** +* Trains [[StringIndexer]] by learning the count of each string in the input DataSet. +*/ + + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, input: DataSet[String]):Unit ={ +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + private def extractIndices(input: DataSet[String]): DataSet[(String, Long)] = { + +val mapping = input + .mapWith( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .partitionByRange( 1 ) --- End diff -- `zipWithIndex` is implemented with `mapPartition` which does not change the partitioning or ordering. You've referenced the correct implementation but the code is still missing the `sortPartition(1, Order.DESCENDING)`. > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15664651#comment-15664651 ] ASF GitHub Bot commented on FLINK-4964: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r87860878 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,108 @@ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * String Indexer + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[DataSet[(String, Long)]] = None + + + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods == + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations = + + /** +* Trains [[StringIndexer]] by learning the count of each string in the input DataSet. +*/ + + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, input: DataSet[String]):Unit ={ +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + private def extractIndices(input: DataSet[String]): DataSet[(String, Long)] = { + +val mapping = input + .mapWith( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) --- End diff -- Can this be replaced with `.sum(1)`? > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652290#comment-15652290 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r87295380 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,108 @@ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * String Indexer + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[DataSet[(String, Long)]] = None + + + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods == + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations = + + /** +* Trains [[StringIndexer]] by learning the count of each string in the input DataSet. +*/ + + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, input: DataSet[String]):Unit ={ +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + private def extractIndices(input: DataSet[String]): DataSet[(String, Long)] = { + +val mapping = input + .mapWith( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .partitionByRange( 1 ) --- End diff -- Indeed, I need to do a global sort, because mapping is a sorted DataSet[(String,Long)] of (labels,index), where the most frequent item has index = 0. I need to sort a dataSet of (labels,frequency), then zipWithIndex to get the associated index. I've just realised that sortPartition() will only sort my partitions locally, so how can I achieve a global sort this way ? > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650936#comment-15650936 ] ASF GitHub Bot commented on FLINK-4964: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r87192044 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,108 @@ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * String Indexer + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[DataSet[(String, Long)]] = None + + + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods == + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations = + + /** +* Trains [[StringIndexer]] by learning the count of each string in the input DataSet. +*/ + + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, input: DataSet[String]):Unit ={ +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + private def extractIndices(input: DataSet[String]): DataSet[(String, Long)] = { + +val mapping = input + .mapWith( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .partitionByRange( 1 ) --- End diff -- Could you explain what is the mapping doing here? If you are trying to sort shouldn't you be using `.sortPartition()` after the partition? > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650787#comment-15650787 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 @thvasilo @greghogan I've updated my code so that I'm streaming instead of caching with a collect(). Does it seem ok for you ? > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636993#comment-15636993 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 I've changed my code so that I have now mapping:DataSet[(String,Long)] val mapping = input .mapWith( s => (s, 1) ) .groupBy( 0 ) .reduce( (a, b) => (a._1, a._2 + b._2) ) .partitionByRange( 1 ) .zipWithIndex .mapWith { case (id, (label, count)) => (label, id) } Parsing a new DataSet[String] called rawInput, I'd like to use this mapping and associate each "label" of rawInput an ID (which is the Long value of mapping). Is it possible with a streaming approach (need a join for example) ? > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633603#comment-15633603 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r86402282 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where each label is + * associated with an index.The indices are in [0,numLabels) and are ordered by label + * frequencies. The most frequent label has index 0. + * + * @example + * {{{ + *val trainingDS: DataSet[String] = env.fromCollection(data) + *val transformer = StringIndexer().setHandleInvalid("skip") + * + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) + * }}} + * + * + * You can manage unseen labels using HandleInvalid parameter. If HandleInvalid is + * set to "skip" (see example),then each line containing an unseen label is skipped. + * Otherwise an exception is raised. + * + * =Parameters= + * + * -[[HandleInvalid]]: Define how to handle unseen labels: by default is "skip" + * + * + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[Map[String, Int]] = None + + + /** +* Set the value to handle unseen labels +* @param value set to "skip" if you want to filter line with unseen labels +* @return StringIndexer instance with HandleInvalid value +*/ + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations === + + /** +* Trains [[StringIndexer]] by learning the count of each labels in the input DataSet. +* +* @return [[FitOperation]] training the [[StringIndexer]] on string labels +*/ + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, +input: DataSet[String]): Unit = { +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + /** +* Count the frequency of each label, sort them in a decreasing order and assign an index +* +* @param input input Dataset containing labels +* @return a map that returns for each label (key) its index (value) +*/ + private def extractIndices(input: DataSet[String]): Map[String, Int] ={ + +implicit val resultTypeInformation = createTypeInformation[(String, Int)] + +val mapper = input + .map( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .collect( ) +
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633599#comment-15633599 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r86402178 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where each label is + * associated with an index.The indices are in [0,numLabels) and are ordered by label + * frequencies. The most frequent label has index 0. + * + * @example + * {{{ + *val trainingDS: DataSet[String] = env.fromCollection(data) + *val transformer = StringIndexer().setHandleInvalid("skip") + * + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) + * }}} + * + * + * You can manage unseen labels using HandleInvalid parameter. If HandleInvalid is + * set to "skip" (see example),then each line containing an unseen label is skipped. + * Otherwise an exception is raised. + * + * =Parameters= + * + * -[[HandleInvalid]]: Define how to handle unseen labels: by default is "skip" + * + * + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[Map[String, Int]] = None + + + /** +* Set the value to handle unseen labels +* @param value set to "skip" if you want to filter line with unseen labels +* @return StringIndexer instance with HandleInvalid value +*/ + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations === + + /** +* Trains [[StringIndexer]] by learning the count of each labels in the input DataSet. +* +* @return [[FitOperation]] training the [[StringIndexer]] on string labels +*/ + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, +input: DataSet[String]): Unit = { +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + /** +* Count the frequency of each label, sort them in a decreasing order and assign an index +* +* @param input input Dataset containing labels +* @return a map that returns for each label (key) its index (value) +*/ + private def extractIndices(input: DataSet[String]): Map[String, Int] ={ + +implicit val resultTypeInformation = createTypeInformation[(String, Int)] + +val mapper = input + .map( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .collect( ) --- End
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633463#comment-15633463 ] ASF GitHub Bot commented on FLINK-4964: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r86391698 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where each label is + * associated with an index.The indices are in [0,numLabels) and are ordered by label + * frequencies. The most frequent label has index 0. + * + * @example + * {{{ + *val trainingDS: DataSet[String] = env.fromCollection(data) + *val transformer = StringIndexer().setHandleInvalid("skip") + * + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) + * }}} + * + * + * You can manage unseen labels using HandleInvalid parameter. If HandleInvalid is + * set to "skip" (see example),then each line containing an unseen label is skipped. + * Otherwise an exception is raised. + * + * =Parameters= + * + * -[[HandleInvalid]]: Define how to handle unseen labels: by default is "skip" + * + * + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[Map[String, Int]] = None + + + /** +* Set the value to handle unseen labels +* @param value set to "skip" if you want to filter line with unseen labels +* @return StringIndexer instance with HandleInvalid value +*/ + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations === + + /** +* Trains [[StringIndexer]] by learning the count of each labels in the input DataSet. +* +* @return [[FitOperation]] training the [[StringIndexer]] on string labels +*/ + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, +input: DataSet[String]): Unit = { +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + /** +* Count the frequency of each label, sort them in a decreasing order and assign an index +* +* @param input input Dataset containing labels +* @return a map that returns for each label (key) its index (value) +*/ + private def extractIndices(input: DataSet[String]): Map[String, Int] ={ + +implicit val resultTypeInformation = createTypeInformation[(String, Int)] + +val mapper = input + .map( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .collect( ) +
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633464#comment-15633464 ] ASF GitHub Bot commented on FLINK-4964: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r86391230 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where each label is + * associated with an index.The indices are in [0,numLabels) and are ordered by label + * frequencies. The most frequent label has index 0. + * + * @example + * {{{ + *val trainingDS: DataSet[String] = env.fromCollection(data) + *val transformer = StringIndexer().setHandleInvalid("skip") + * + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) + * }}} + * + * + * You can manage unseen labels using HandleInvalid parameter. If HandleInvalid is + * set to "skip" (see example),then each line containing an unseen label is skipped. + * Otherwise an exception is raised. + * + * =Parameters= + * + * -[[HandleInvalid]]: Define how to handle unseen labels: by default is "skip" + * + * + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[Map[String, Int]] = None + + + /** +* Set the value to handle unseen labels +* @param value set to "skip" if you want to filter line with unseen labels +* @return StringIndexer instance with HandleInvalid value +*/ + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations === + + /** +* Trains [[StringIndexer]] by learning the count of each labels in the input DataSet. +* +* @return [[FitOperation]] training the [[StringIndexer]] on string labels +*/ + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, +input: DataSet[String]): Unit = { +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + /** +* Count the frequency of each label, sort them in a decreasing order and assign an index +* +* @param input input Dataset containing labels +* @return a map that returns for each label (key) its index (value) +*/ + private def extractIndices(input: DataSet[String]): Map[String, Int] ={ + +implicit val resultTypeInformation = createTypeInformation[(String, Int)] + +val mapper = input + .map( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .collect( ) --- End diff
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633465#comment-15633465 ] ASF GitHub Bot commented on FLINK-4964: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r86392126 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * StringIndexer maps a DataSet[String] to a DataSet[(String,Int)] where each label is + * associated with an index.The indices are in [0,numLabels) and are ordered by label + * frequencies. The most frequent label has index 0. + * + * @example + * {{{ + *val trainingDS: DataSet[String] = env.fromCollection(data) + *val transformer = StringIndexer().setHandleInvalid("skip") + * + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) + * }}} + * + * + * You can manage unseen labels using HandleInvalid parameter. If HandleInvalid is + * set to "skip" (see example),then each line containing an unseen label is skipped. + * Otherwise an exception is raised. + * + * =Parameters= + * + * -[[HandleInvalid]]: Define how to handle unseen labels: by default is "skip" + * + * + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[Map[String, Int]] = None + + + /** +* Set the value to handle unseen labels +* @param value set to "skip" if you want to filter line with unseen labels +* @return StringIndexer instance with HandleInvalid value +*/ + def setHandleInvalid(value: String): this.type ={ +parameters.add( HandleInvalid, value ) +this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { +val defaultValue: Option[String] = Some( "skip" ) + } + + // Factory methods + + def apply(): StringIndexer ={ +new StringIndexer( ) + } + + // == Operations === + + /** +* Trains [[StringIndexer]] by learning the count of each labels in the input DataSet. +* +* @return [[FitOperation]] training the [[StringIndexer]] on string labels +*/ + implicit def fitStringIndexer ={ +new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, +input: DataSet[String]): Unit = { +val metrics = extractIndices( input ) +instance.metricsOption = Some( metrics ) + } +} + } + + /** +* Count the frequency of each label, sort them in a decreasing order and assign an index +* +* @param input input Dataset containing labels +* @return a map that returns for each label (key) its index (value) +*/ + private def extractIndices(input: DataSet[String]): Map[String, Int] ={ + +implicit val resultTypeInformation = createTypeInformation[(String, Int)] + +val mapper = input + .map( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .collect( ) +
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630729#comment-15630729 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 Yes, I've just updated the PR title > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)