Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19746#discussion_r151954037 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} +import org.apache.spark.sql.{Column, DataFrame, Dataset} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.StructType + +/** + * A feature transformer that adds vector size information to a vector column. + */ +@Experimental +@Since("2.3.0") +class VectorSizeHint @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Transformer with HasInputCol with HasHandleInvalid with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("vectSizeHint")) + + @Since("2.3.0") + val size = new Param[Int](this, "size", "Size of vectors in column.", {s: Int => s >= 0}) + + @Since("2.3.0") + def getSize: Int = getOrDefault(size) + + /** @group setParam */ + @Since("2.3.0") + def setSize(value: Int): this.type = set(size, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCol(value: String): this.type = set(inputCol, value) + + @Since("2.3.0") + override val handleInvalid: Param[String] = new Param[String]( + this, + "handleInvalid", + "How to handle invalid vectors in inputCol, (invalid vectors include nulls and vectors with " + + "the wrong size. The options are `skip` (filter out rows with invalid vectors), `error` " + + "(throw an error) and `optimistic` (don't check the vector size).", + ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids)) + + /** @group setParam */ + @Since("2.3.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + setDefault(handleInvalid, VectorSizeHint.ERROR_INVALID) + + @Since("2.3.0") + override def transform(dataset: Dataset[_]): DataFrame = { + val localInputCol = getInputCol + val localSize = getSize + val localHandleInvalid = getHandleInvalid + + val group = AttributeGroup.fromStructField(dataset.schema(localInputCol)) + if (localHandleInvalid == VectorSizeHint.OPTIMISTIC_INVALID && group.size == localSize) { + dataset.toDF + } else { + val newGroup = if (group.size == localSize) { + // Pass along any existing metadata about vector. + group + } else { + new AttributeGroup(localInputCol, localSize) + } + + val newCol: Column = localHandleInvalid match { + case VectorSizeHint.OPTIMISTIC_INVALID => col(localInputCol) + case VectorSizeHint.ERROR_INVALID => + val checkVectorSize = { vector: Vector => --- End diff -- I think here can simply use: ``` val checkVectorSizeUDF = udf { vector: Vector => ...} checkVectorSizeUDF(col(localInputCol)) ``` So code will be clearer.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org