[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664652#comment-15664652 ]
ASF GitHub Bot commented on FLINK-4964: --------------------------------------- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2740#discussion_r87863825 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala --- @@ -0,0 +1,108 @@ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.api.scala.utils._ +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid + +import scala.collection.immutable.Seq + +/** + * String Indexer + */ +class StringIndexer extends Transformer[StringIndexer] { + + private[preprocessing] var metricsOption: Option[DataSet[(String, Long)]] = None + + + def setHandleInvalid(value: String): this.type ={ + parameters.add( HandleInvalid, value ) + this + } + +} + +object StringIndexer { + + case object HandleInvalid extends Parameter[String] { + val defaultValue: Option[String] = Some( "skip" ) + } + + // ==================================== Factory methods ========================================== + + def apply(): StringIndexer ={ + new StringIndexer( ) + } + + // ====================================== Operations ============================================= + + /** + * Trains [[StringIndexer]] by learning the count of each string in the input DataSet. + */ + + implicit def fitStringIndexer ={ + new FitOperation[StringIndexer, String] { + def fit(instance: StringIndexer, fitParameters: ParameterMap, input: DataSet[String]):Unit ={ + val metrics = extractIndices( input ) + instance.metricsOption = Some( metrics ) + } + } + } + + private def extractIndices(input: DataSet[String]): DataSet[(String, Long)] = { + + val mapping = input + .mapWith( s => (s, 1) ) + .groupBy( 0 ) + .reduce( (a, b) => (a._1, a._2 + b._2) ) + .partitionByRange( 1 ) --- End diff -- `zipWithIndex` is implemented with `mapPartition` which does not change the partitioning or ordering. You've referenced the correct implementation but the code is still missing the `sortPartition(1, Order.DESCENDING)`. > FlinkML - Add StringIndexer > --------------------------- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature > Reporter: Thomas FOURNIER > Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)