Github user thunterdb commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15770#discussion_r101665899
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.ml.clustering
    +
    +import org.apache.spark.annotation.{Experimental, Since}
    +import org.apache.spark.ml.Transformer
    +import org.apache.spark.ml.linalg.{Vector}
    +import org.apache.spark.ml.param._
    +import org.apache.spark.ml.param.shared._
    +import org.apache.spark.ml.util._
    +import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
    +import 
org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.{DataFrame, Dataset, Row}
    +import org.apache.spark.sql.functions.{col}
    +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
    +
    +/**
    + * Common params for PowerIterationClustering
    + */
    +private[clustering] trait PowerIterationClusteringParams extends Params 
with HasMaxIter
    +  with HasFeaturesCol with HasPredictionCol {
    +
    +  /**
    +   * The number of clusters to create (k). Must be > 1. Default: 2.
    +   * @group param
    +   */
    +  @Since("2.2.0")
    +  final val k = new IntParam(this, "k", "The number of clusters to create. 
" +
    +    "Must be > 1.", ParamValidators.gt(1))
    +
    +  /** @group getParam */
    +  @Since("2.2.0")
    +  def getK: Int = $(k)
    +
    +  /**
    +   * Param for the initialization algorithm. This can be either "random" 
to use a random vector
    +   * as vertex properties, or "degree" to use normalized sum similarities. 
Default: random.
    +   */
    +  @Since("2.2.0")
    +  final val initMode = new Param[String](this, "initMode", "The 
initialization algorithm. " +
    +    "Supported options: 'random' and 'degree'.",
    +    (value: String) => validateInitMode(value))
    +
    +  private[spark] def validateInitMode(initMode: String): Boolean = {
    +    initMode match {
    +      case "random" => true
    +      case "degree" => true
    +      case _ => false
    +    }
    +  }
    +
    +  /** @group expertGetParam */
    +  @Since("2.2.0")
    +  def getInitMode: String = $(initMode)
    +
    +  /**
    +   * Param for the column name for ids returned by 
[[PowerIterationClustering.transform()]].
    +   * Default: "id"
    +   * @group param
    +   */
    +  val idCol = new Param[String](this, "idCol", "column name for ids.")
    +
    +  /** @group getParam */
    +  def getIdCol: String = $(idCol)
    +
    +  /**
    +   * Validates the input schema
    +   * @param schema input schema
    +   */
    +  protected def validateSchema(schema: StructType): Unit = {
    +    SchemaUtils.checkColumnType(schema, $(idCol), LongType)
    +    SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
    +  }
    +}
    +
    +/**
    + * :: Experimental ::
    + * Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
    + * [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]]. From the 
abstract: PIC finds a very
    + * low-dimensional embedding of a dataset using truncated power iteration 
on a normalized pair-wise
    + * similarity matrix of the data.
    + *
    + * Note that we implement [[PowerIterationClustering]] as a transformer. 
The [[transform]] is an
    + * expensive operation, because it uses PIC algorithm to cluster the whole 
input dataset.
    + *
    + * @see [[http://en.wikipedia.org/wiki/Spectral_clustering Spectral 
clustering (Wikipedia)]]
    + */
    +@Since("2.2.0")
    +@Experimental
    +class PowerIterationClustering private[clustering] (
    +                                @Since("2.2.0") override val uid: String)
    +  extends Transformer with PowerIterationClusteringParams with 
DefaultParamsWritable {
    +
    +  setDefault(
    +    k -> 2,
    +    maxIter -> 20,
    +    initMode -> "random",
    +    idCol -> "id")
    +
    +  @Since("2.2.0")
    +  override def copy(extra: ParamMap): PowerIterationClustering = 
defaultCopy(extra)
    +
    +  @Since("2.2.0")
    +  def this() = this(Identifiable.randomUID("PowerIterationClustering"))
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setK(value: Int): this.type = set(k, value)
    +
    +  /** @group expertSetParam */
    +  @Since("2.2.0")
    +  def setInitMode(value: String): this.type = set(initMode, value)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setMaxIter(value: Int): this.type = set(maxIter, value)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setIdCol(value: String): this.type = set(idCol, value)
    +
    +  @Since("2.2.0")
    +  override def transform(dataset: Dataset[_]): DataFrame = {
    --- End diff --
    
    One issue with the current implementation is that extra columns are going 
to be discarded (this is also the case with most graphframes algorithms), which 
breaks the general contract of transformers. After making the transform, you 
should join back on the missing columns.
    Also, a test for this use case would be useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to