[ 
https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664652#comment-15664652
 ] 

ASF GitHub Bot commented on FLINK-4964:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2740#discussion_r87863825
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StringIndexer.scala
 ---
    @@ -0,0 +1,108 @@
    +package org.apache.flink.ml.preprocessing
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.extensions.acceptPartialFunctions
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{Parameter, ParameterMap}
    +import org.apache.flink.ml.pipeline.{FitOperation, 
TransformDataSetOperation, Transformer}
    +import org.apache.flink.ml.preprocessing.StringIndexer.HandleInvalid
    +
    +import scala.collection.immutable.Seq
    +
    +/**
    +  * String Indexer
    +  */
    +class StringIndexer extends Transformer[StringIndexer] {
    +
    +  private[preprocessing] var metricsOption: Option[DataSet[(String, 
Long)]] = None
    +
    +
    +  def setHandleInvalid(value: String): this.type ={
    +    parameters.add( HandleInvalid, value )
    +    this
    +  }
    +
    +}
    +
    +object StringIndexer {
    +
    +  case object HandleInvalid extends Parameter[String] {
    +    val defaultValue: Option[String] = Some( "skip" )
    +  }
    +
    +  // ==================================== Factory methods 
==========================================
    +
    +  def apply(): StringIndexer ={
    +    new StringIndexer( )
    +  }
    +
    +  // ====================================== Operations 
=============================================
    +
    +  /**
    +    * Trains [[StringIndexer]] by learning the count of each string in the 
input DataSet.
    +    */
    +
    +  implicit def fitStringIndexer ={
    +    new FitOperation[StringIndexer, String] {
    +      def fit(instance: StringIndexer, fitParameters: ParameterMap, input: 
DataSet[String]):Unit ={
    +        val metrics = extractIndices( input )
    +        instance.metricsOption = Some( metrics )
    +      }
    +    }
    +  }
    +
    +  private def extractIndices(input: DataSet[String]): DataSet[(String, 
Long)] = {
    +
    +    val mapping = input
    +      .mapWith( s => (s, 1) )
    +      .groupBy( 0 )
    +      .reduce( (a, b) => (a._1, a._2 + b._2) )
    +      .partitionByRange( 1 )
    --- End diff --
    
    `zipWithIndex` is implemented with `mapPartition` which does not change the 
partitioning or ordering. You've referenced the correct implementation but the 
code is still missing the `sortPartition(1, Order.DESCENDING)`.


> FlinkML - Add StringIndexer
> ---------------------------
>
>                 Key: FLINK-4964
>                 URL: https://issues.apache.org/jira/browse/FLINK-4964
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Thomas FOURNIER
>            Priority: Minor
>
> Add StringIndexer as described here:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> This will be added in package preprocessing of FlinkML



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to