[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20829 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178676396 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- Oh, I see; you're right. Does this mean the latest change removes the space though? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178636550 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- "\n" should be replaced with " " and then a message. Unless I misunderstand something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178620282 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getVectorLengthsFromFirstRow(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +s"""Can not infer column lengths for 'keep invalid' mode. Consider using --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178620200 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178620104 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178620142 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178619977 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], --- End diff -- Ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178619893 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { --- End diff -- This still isn't correct; look at other MLlib code for examples. It should be: ``` private[feature] def getVectorLengthsFromFirstRow( dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178605922 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). Column lengths are taken from the size of ML Attribute Group, which can be set using + * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred + * from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. + * Default: "error" + * @group param + */ + @Since("2.4.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +""" +| Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with +| invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the +| output). Column lengths are taken from the size of ML Attribute Group, which can be set using +| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred +| from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. +| """.stripMargin.replaceAll("\n", " "), +ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val vectorCols = $(inputCols).toSeq.filter { c => + schema(c).dataType match { +case _: VectorUDT => true +case _ => false + } +} +val vectorColsLengths = VectorAssembler.getLengths(dataset, vectorCols, $(handleInvalid)) + +val featureAttributesMap = $(inputCols).toSeq.map { c => --- End diff -- We need the map to find out the length of vectors, unless there's a way to do this in one mapping way, I think it might be better than to call first a `map` and then a `flatMap`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177542325 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], --- End diff -- scala style: For multiline class and method headers, put the first argument on the next line, with +4 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177543971 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { --- End diff -- nit: In Scala, use camelCase: first_sizes -> firstSizes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177500915 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -17,26 +17,32 @@ package org.apache.spark.ml.feature +import java.util.NoSuchElementException + import scala.collection.mutable.ArrayBuilder +import scala.language.existentials import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute, UnresolvedAttribute} import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} -import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ /** - * A feature transformer that merges multiple columns into a vector column. + * A feature transformer that merges multiple columns into a vector column. This requires one pass --- End diff -- style nit: Move new text here into a new paragraph below. That will give nicer "pyramid-style" formatting with essential info separated from details. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177542280 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { --- End diff -- scala style: For multiline class and method headers, put the first argument on the next line, with +4 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177505970 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). Column lengths are taken from the size of ML Attribute Group, which can be set using + * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred + * from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. + * Default: "error" + * @group param + */ + @Since("2.4.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +""" +| Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with +| invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the +| output). Column lengths are taken from the size of ML Attribute Group, which can be set using +| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred +| from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. +| """.stripMargin.replaceAll("\n", " "), +ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val vectorCols = $(inputCols).toSeq.filter { c => + schema(c).dataType match { +case _: VectorUDT => true +case _ => false + } +} +val vectorColsLengths = VectorAssembler.getLengths(dataset, vectorCols, $(handleInvalid)) + +val featureAttributesMap = $(inputCols).toSeq.map { c => val field = schema(c) - val index = schema.fieldIndex(c) field.dataType match { case DoubleType => - val attr = Attribute.fromStructField(field) - // If the input column doesn't have ML attribute, assume numeric. - if (attr == UnresolvedAttribute) { -Some(NumericAttribute.defaultAttr.withName(c)) - } else { -Some(attr.withName(c)) + val attribute = Attribute.fromStructField(field) + attribute match { +case UnresolvedAttribute => + Seq(NumericAttribute.defaultAttr.withName(c)) +case _ => + Seq(attribute.withName(c)) } case _: NumericType | BooleanType => // If the input column type is a compatible scalar type, assume numeric. - Some(NumericAttribute.defaultAttr.withName(c)) + Seq(NumericAttribute.defaultAttr.withName(c)) case _: VectorUDT => - val group = AttributeGroup.fromStructField(field) - if (group.attributes.isDefined) { -// If attributes are defined, copy them with updated names. -group.attributes.get.zipWithIndex.map { case (attr, i) => + val attributeGroup = AttributeGroup.fromStructField(field) --- End diff -- for the future, I'd avoid renaming things like this unless it's really unclear or needed (to make diffs shorter) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177503206 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). Column lengths are taken from the size of ML Attribute Group, which can be set using + * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred + * from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. + * Default: "error" + * @group param + */ + @Since("2.4.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +""" +| Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with +| invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the +| output). Column lengths are taken from the size of ML Attribute Group, which can be set using +| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred +| from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. +| """.stripMargin.replaceAll("\n", " "), +ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val vectorCols = $(inputCols).toSeq.filter { c => --- End diff -- nit: Is toSeq extraneous? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177501836 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with --- End diff -- I'd recommend we deal with NaNs now. This PR is already dealing with some NaN cases: Dataset.na.drop handles NaNs in NumericType columns (but not VectorUDT columns). I'm Ok with postponing incorrect vector lengths until later or doing that now since that work will be more separate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177547373 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getVectorLengthsFromFirstRow(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +s"""Can not infer column lengths for 'keep invalid' mode. Consider using --- End diff -- nit: Refer to specific Param value: "Cannot infer column lengths for mode handleInvalid = "keep"" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177543316 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177543289 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- nit: Put space in between this message and e.toString --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177559339 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +159,88 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls when keepInvalid is true") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors when keepInvalid is false") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Array(1, 1), false)(1.0, null)) +intercept[SparkException](assemble(Array(1, 2), false)(1.0, null)) +intercept[SparkException](assemble(Array(1), false)(null)) +intercept[SparkException](assemble(Array(2), false)(null)) + } + + test("get lengths functions") { +import org.apache.spark.ml.feature.VectorAssembler._ +val df = dfWithNulls +assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2)) + assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"), Seq("y"))) + .getMessage.contains("VectorSizeHint")) + assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1 > 4"), + Seq("y"))).getMessage.contains("VectorSizeHint")) + +assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == "y" -> 2)) +assert(intercept[NullPointerException](getLengths(df.sort("id2"), Seq("y"), ERROR_INVALID)) + .getMessage.contains("VectorSizeHint")) +assert(intercept[RuntimeException](getLengths(df.sort("id2"), Seq("y"), KEEP_INVALID)) + .getMessage.contains("VectorSizeHint")) + } + + test("Handle Invalid should behave properly") { +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +def run_with_metadata(mode: String, additional_filter: String = "true"): Dataset[_] = { --- End diff -- style: use camelCase --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177559587 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +159,88 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls when keepInvalid is true") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors when keepInvalid is false") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Array(1, 1), false)(1.0, null)) +intercept[SparkException](assemble(Array(1, 2), false)(1.0, null)) +intercept[SparkException](assemble(Array(1), false)(null)) +intercept[SparkException](assemble(Array(2), false)(null)) + } + + test("get lengths functions") { +import org.apache.spark.ml.feature.VectorAssembler._ +val df = dfWithNulls +assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2)) + assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"), Seq("y"))) + .getMessage.contains("VectorSizeHint")) + assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1 > 4"), + Seq("y"))).getMessage.contains("VectorSizeHint")) + +assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == "y" -> 2)) +assert(intercept[NullPointerException](getLengths(df.sort("id2"), Seq("y"), ERROR_INVALID)) + .getMessage.contains("VectorSizeHint")) +assert(intercept[RuntimeException](getLengths(df.sort("id2"), Seq("y"), KEEP_INVALID)) + .getMessage.contains("VectorSizeHint")) + } + + test("Handle Invalid should behave properly") { +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +def run_with_metadata(mode: String, additional_filter: String = "true"): Dataset[_] = { + val attributeY = new AttributeGroup("y", 2) + val subAttributesOfZ = Array(NumericAttribute.defaultAttr, NumericAttribute.defaultAttr) --- End diff -- unused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177547735 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getVectorLengthsFromFirstRow(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +s"""Can not infer column lengths for 'keep invalid' mode. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ")) + case (_, _) => Map.empty +} +group_sizes ++ first_sizes + } + + @Since("1.6.0") override def load(path: String): VectorAssembler = super.load(path) - private[feature] def assemble(vv: Any*): Vector = { + /** + * Returns a UDF that has the required information to assemble each row. --- End diff -- nit: When people say "UDF," they generally mean a Spark SQL UDF. This is just a function, not a SQL UDF. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177504904 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). Column lengths are taken from the size of ML Attribute Group, which can be set using + * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred + * from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. + * Default: "error" + * @group param + */ + @Since("2.4.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +""" +| Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with +| invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the +| output). Column lengths are taken from the size of ML Attribute Group, which can be set using +| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred +| from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. +| """.stripMargin.replaceAll("\n", " "), +ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val vectorCols = $(inputCols).toSeq.filter { c => + schema(c).dataType match { +case _: VectorUDT => true +case _ => false + } +} +val vectorColsLengths = VectorAssembler.getLengths(dataset, vectorCols, $(handleInvalid)) + +val featureAttributesMap = $(inputCols).toSeq.map { c => --- End diff -- I think the flatMap is simpler, or at least a more common pattern in Spark and Scala (rather than having nested sequences which are then flattened). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177558064 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -18,56 +18,68 @@ package org.apache.spark.ml.feature import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.ml.attribute.{AttributeGroup, NominalAttribute, NumericAttribute} +import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute, NumericAttribute} import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.functions.{col, udf} class VectorAssemblerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { import testImplicits._ + @transient var dfWithNulls: Dataset[_] = _ + + override def beforeAll(): Unit = { +super.beforeAll() +dfWithNulls = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long, String)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L, null), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L, null), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L, null), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L, null)) + .toDF("id1", "id2", "x", "y", "name", "z", "n", "nulls") + } + test("params") { ParamsSuite.checkParams(new VectorAssembler) } test("assemble") { import org.apache.spark.ml.feature.VectorAssembler.assemble -assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) -assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) +assert(assemble(Array(1), true)(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) +assert(assemble(Array(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) val dv = Vectors.dense(2.0, 0.0) -assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) +assert(assemble(Array(1, 2, 1), true)(0.0, dv, 1.0) === + Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0)) -assert(assemble(0.0, dv, 1.0, sv) === +assert(assemble(Array(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) === Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0))) -for (v <- Seq(1, "a", null)) { - intercept[SparkException](assemble(v)) - intercept[SparkException](assemble(1.0, v)) +for (v <- Seq(1, "a")) { + intercept[SparkException](assemble(Array(1), true)(v)) + intercept[SparkException](assemble(Array(1, 1), true)(1.0, v)) } } test("assemble should compress vectors") { import org.apache.spark.ml.feature.VectorAssembler.assemble -val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0)) +val v1 = assemble(Array(1, 1, 1, 1), true)(0.0, 0.0, 0.0, Vectors.dense(4.0)) assert(v1.isInstanceOf[SparseVector]) -val v2 = assemble(1.0, 2.0, 3.0, Vectors.sparse(1, Array(0), Array(4.0))) +val sv = Vectors.sparse(1, Array(0), Array(4.0)) +val v2 = assemble(Array(1, 1, 1, 1), true)(1.0, 2.0, 3.0, sv) assert(v2.isInstanceOf[DenseVector]) } test("VectorAssembler") { -val df = Seq( - (0, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 10L) -).toDF("id", "x", "y", "name", "z", "n") +val df = dfWithNulls.filter("id1 == 1").withColumn("id", col("id1")) --- End diff -- nit: If this is for consolidation, I'm actually against this little change since it obscures what this test is doing and moves the input Row farther from the expected output row. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177543627 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +s"""Encountered empty dataframe while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], --- End diff -- style: state return value explicitly (This isn't completely consistent in Spark, but we try to in MLlib) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r177560225 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +159,88 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls when keepInvalid is true") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors when keepInvalid is false") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Array(1, 1), false)(1.0, null)) +intercept[SparkException](assemble(Array(1, 2), false)(1.0, null)) +intercept[SparkException](assemble(Array(1), false)(null)) +intercept[SparkException](assemble(Array(2), false)(null)) + } + + test("get lengths functions") { +import org.apache.spark.ml.feature.VectorAssembler._ +val df = dfWithNulls +assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2)) + assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"), Seq("y"))) + .getMessage.contains("VectorSizeHint")) + assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1 > 4"), + Seq("y"))).getMessage.contains("VectorSizeHint")) + +assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == "y" -> 2)) +assert(intercept[NullPointerException](getLengths(df.sort("id2"), Seq("y"), ERROR_INVALID)) + .getMessage.contains("VectorSizeHint")) +assert(intercept[RuntimeException](getLengths(df.sort("id2"), Seq("y"), KEEP_INVALID)) + .getMessage.contains("VectorSizeHint")) + } + + test("Handle Invalid should behave properly") { +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +def run_with_metadata(mode: String, additional_filter: String = "true"): Dataset[_] = { + val attributeY = new AttributeGroup("y", 2) + val subAttributesOfZ = Array(NumericAttribute.defaultAttr, NumericAttribute.defaultAttr) + val attributeZ = new AttributeGroup( +"z", +Array[Attribute]( + NumericAttribute.defaultAttr.withName("foo"), + NumericAttribute.defaultAttr.withName("bar"))) + val dfWithMetadata = dfWithNulls.withColumn("y", col("y"), attributeY.toMetadata()) +.withColumn("z", col("z"), attributeZ.toMetadata()).filter(additional_filter) + val output = assembler.setHandleInvalid(mode).transform(dfWithMetadata) + output.collect() + output +} +def run_with_first_row(mode: String): Dataset[_] = { --- End diff -- style: Put empty line between functions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176285294 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with --- End diff -- also, we just deal with nulls here. NaNs and incorrect length vectors are transmitted transparently. Do we need to test for those? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176280827 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) + intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect()) + +// numeric column is all null --- End diff -- was testing extraction of metadata for numeric column (is always 1). Not relevant in new framework. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176267223 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) --- End diff -- it fails because vector size hint is not given, adding a section with VectorSizeHInts --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176265864 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], --- End diff -- updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176266756 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- thanks, good idea! this helped me in catching the `drop.na()` bug that might drop everything --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176266213 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- to allow nulls in the column :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176245770 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { --- End diff -- Thanks! We do throw some descriptive error [here](https://github.com/apache/spark/pull/20829/files#diff-9c84e1d27f25714e256cb482069359cfR193), added more description to it and made assertions in test on those messages. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176228007 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with --- End diff -- Behavior of options already included, explanation of column length included here, run time information included in the VectorAssembler class's documentation. Thanks for the suggestion, this is super important! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176220684 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -37,24 +37,26 @@ class VectorAssemblerSuite test("assemble") { import org.apache.spark.ml.feature.VectorAssembler.assemble -assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) -assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) +assert(assemble(Seq(1), true)(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) +assert(assemble(Seq(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) val dv = Vectors.dense(2.0, 0.0) -assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) +assert(assemble(Seq(1, 2, 1), true)(0.0, dv, 1.0) === + Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0)) -assert(assemble(0.0, dv, 1.0, sv) === +assert(assemble(Seq(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) === Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0))) -for (v <- Seq(1, "a", null)) { - intercept[SparkException](assemble(v)) - intercept[SparkException](assemble(1.0, v)) +for (v <- Seq(1, "a")) { + intercept[SparkException](assemble(Seq(1), true)(v)) + intercept[SparkException](assemble(Seq(1, 1), true)(1.0, v)) } } test("assemble should compress vectors") { import org.apache.spark.ml.feature.VectorAssembler.assemble -val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0)) +val v1 = assemble(Seq(1, 1, 1, 4), true)(0.0, 0.0, 0.0, Vectors.dense(4.0)) --- End diff -- that's a typo, Thanks for pointing it out! that number is not used in case we do not have nulls, which is why the test passes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175908732 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { --- End diff -- make more explicit: + " when keepInvalid = true" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175907866 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -37,24 +37,26 @@ class VectorAssemblerSuite test("assemble") { import org.apache.spark.ml.feature.VectorAssembler.assemble -assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) -assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) +assert(assemble(Seq(1), true)(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) +assert(assemble(Seq(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) val dv = Vectors.dense(2.0, 0.0) -assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) +assert(assemble(Seq(1, 2, 1), true)(0.0, dv, 1.0) === + Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0)) -assert(assemble(0.0, dv, 1.0, sv) === +assert(assemble(Seq(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) === Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0))) -for (v <- Seq(1, "a", null)) { - intercept[SparkException](assemble(v)) - intercept[SparkException](assemble(1.0, v)) +for (v <- Seq(1, "a")) { + intercept[SparkException](assemble(Seq(1), true)(v)) + intercept[SparkException](assemble(Seq(1, 1), true)(1.0, v)) } } test("assemble should compress vectors") { import org.apache.spark.ml.feature.VectorAssembler.assemble -val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0)) +val v1 = assemble(Seq(1, 1, 1, 4), true)(0.0, 0.0, 0.0, Vectors.dense(4.0)) --- End diff -- We probably want this to fail, right? It expects a Vector of length 4 but is given a Vector of length 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175904796 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -234,7 +234,7 @@ class StringIndexerModel ( val metadata = NominalAttribute.defaultAttr .withName($(outputCol)).withValues(filteredLabels).toMetadata() // If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = getHandleInvalid match { --- End diff -- For the record, in general, I would not bother making changes like this. The one exception I do make is IntelliJ style complaints since those can be annoying for developers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175913440 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { --- End diff -- This is great that you're testing this carefully, but I recommend we make sure to pass better exceptions to users. E.g., they won't know what to do with a NullPointerException, so we could instead tell them something like: "Column x in the first row of the dataset has a null entry, but VectorAssembler expected a non-null entry. This can be fixed by explicitly specifying the expected size using VectorSizeHint." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175912462 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) --- End diff -- I think you can do a direct comparison: ``` assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")) === Map("y" -> 2)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175910846 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], --- End diff -- Also, it's unclear what this method does until I read the code. Possibly make the method name more explicit ("getVectorLengthsFromFirstRow"), and definitely document that "columns" must all be of Vector type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175909539 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +"Saw null value on the first row: " + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +"Cannot infer vector size from all empty DataFrame" + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getLengthsFromFirst(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getLengthsFromFirst(dataset.na.drop, missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +"Consider using VectorSizeHint for columns: " + missing_columns.mkString("[", ",", "]")) + case (_, _) => Map.empty +} +group_sizes ++ first_sizes + } + + @Since("1.6.0") override def load(path: String): VectorAssembler = super.load(path) - private[feature] def assemble(vv: Any*): Vector = { + private[feature] def assemble(lengths: Seq[Int], keepInvalid: Boolean)(vv: Any*): Vector = { --- End diff -- Also, I'd add doc explaining requirements, especially that this assumes that lengths and vv have the same length. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175910402 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first --- End diff -- first -> first() --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175908930 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) --- End diff -- No need to compare with anything; just call assemble() --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175914154 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- Also, if there are "trash" columns not used by VectorAssembler, maybe name them as such and add a few null values in them for better testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175913755 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- Since this is shared across multiple tests, just make it a shared value. See e.g. https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala#L55 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175915187 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) + intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect()) + +// numeric column is all null --- End diff -- Did you want to test: * extraction of metadata from the first row (which is what this is testing, I believe), or * transformation on an all-null column (which this never reaches)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175908248 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with --- End diff -- It would be good to expand this doc to explain the behavior: how various types of invalid values are treated (null, NaN, incorrect Vector length) and how computationally expensive different options can be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175914695 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) --- End diff -- Should this fail? I thought it should pad with NaNs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175915257 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) + intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect()) + +// numeric column is all null +intercept[RuntimeException]( + assembler.setHandleInvalid("keep").transform(df.filter("id1==3")).count() == 1) + +// vector column is all null --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175909880 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- Why java.lang.Double instead of Double? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175911314 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +"Saw null value on the first row: " + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +"Cannot infer vector size from all empty DataFrame" + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getLengthsFromFirst(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getLengthsFromFirst(dataset.na.drop, missing_columns) --- End diff -- This will drop Rows with NA values in extraneous columns. I.e., even if the VectorAssembler is only assembling columns A and B, if there is a NAN in column C, this will drop that row. Pass the list of columns you care about to the drop() method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175910352 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first --- End diff -- Fix (new) IntelliJ style warnings: "toDF" -> "toDF()" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175910188 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], --- End diff -- scala style: For multiline class and method headers, put the first argument on the next line, with +4 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175906193 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +"Saw null value on the first row: " + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +"Cannot infer vector size from all empty DataFrame" + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getLengthsFromFirst(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getLengthsFromFirst(dataset.na.drop, missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +"Consider using VectorSizeHint for columns: " + missing_columns.mkString("[", ",", "]")) + case (_, _) => Map.empty +} +group_sizes ++ first_sizes + } + + @Since("1.6.0") override def load(path: String): VectorAssembler = super.load(path) - private[feature] def assemble(vv: Any*): Vector = { + private[feature] def assemble(lengths: Seq[Int], keepInvalid: Boolean)(vv: Any*): Vector = { --- End diff -- nit: Use Array[Int] for faster access --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175908774 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { --- End diff -- similarly: make more explicit: + " when keepInvalid = false" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175153265 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("1.6.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). + * Default: "error" + * @group param + */ + @Since("1.6.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +"Hhow to handle invalid data (NULL values). Options are 'skip' (filter out rows with " + + "invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN " + + "in the * output).", ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map { c => val field = schema(c) - val index = schema.fieldIndex(c) field.dataType match { -case DoubleType => - val attr = Attribute.fromStructField(field) - // If the input column doesn't have ML attribute, assume numeric. - if (attr == UnresolvedAttribute) { -Some(NumericAttribute.defaultAttr.withName(c)) - } else { -Some(attr.withName(c)) - } -case _: NumericType | BooleanType => - // If the input column type is a compatible scalar type, assume numeric. - Some(NumericAttribute.defaultAttr.withName(c)) case _: VectorUDT => - val group = AttributeGroup.fromStructField(field) - if (group.attributes.isDefined) { -// If attributes are defined, copy them with updated names. -group.attributes.get.zipWithIndex.map { case (attr, i) => + val attributeGroup = AttributeGroup.fromStructField(field) + var length = attributeGroup.size + val isMissingNumAttrs = -1 == length + if (isMissingNumAttrs && dataset.isStreaming) { +// this condition is checked for every column, but should be cheap +throw new RuntimeException( + s""" + |VectorAssembler cannot dynamically determine the size of vectors for streaming + |data. Consider applying VectorSizeHint to ${c} so that this transformer can be + |used to transform streaming inputs. + """.stripMargin.replaceAll("\n", " ")) + } + if (isMissingNumAttrs) { +val column = dataset.select(c).na.drop() --- End diff -- Good catch! That name was bothering me too :P @MrBago and I are thinking of another way to do this more efficiently. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175152022 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -85,18 +120,34 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) } else { // Otherwise, treat all attributes as numeric. If we cannot get the number of attributes // from metadata, check the first row. -val numAttrs = group.numAttributes.getOrElse(first.getAs[Vector](index).size) -Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName(c + "_" + i)) +(0 until length).map { i => NumericAttribute.defaultAttr.withName(c + "_" + i) } + } +case DoubleType => + val attribute = Attribute.fromStructField(field) + attribute match { +case UnresolvedAttribute => + Seq(NumericAttribute.defaultAttr.withName(c)) +case _ => + Seq(attribute.withName(c)) } +case _ : NumericType | BooleanType => + // If the input column type is a compatible scalar type, assume numeric. + Seq(NumericAttribute.defaultAttr.withName(c)) case otherType => throw new SparkException(s"VectorAssembler does not support the $otherType type") } } -val metadata = new AttributeGroup($(outputCol), attrs).toMetadata() - +val featureAttributes = featureAttributesMap.flatten[Attribute] +val lengths = featureAttributesMap.map(a => a.length) +val metadata = new AttributeGroup($(outputCol), featureAttributes.toArray).toMetadata() +val (filteredDataset, keepInvalid) = $(handleInvalid) match { + case StringIndexer.SKIP_INVALID => (dataset.na.drop("any", $(inputCols)), false) --- End diff -- Ah, good point! Although I do think that keeping "any" might make it easier to read, but that may not necessarily hold for experienced people :P --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174990264 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("1.6.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). + * Default: "error" + * @group param + */ + @Since("1.6.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +"Hhow to handle invalid data (NULL values). Options are 'skip' (filter out rows with " + + "invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN " + + "in the * output).", ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) --- End diff -- "in the * output" -> "in the output" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174993897 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -85,18 +120,34 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) } else { // Otherwise, treat all attributes as numeric. If we cannot get the number of attributes // from metadata, check the first row. -val numAttrs = group.numAttributes.getOrElse(first.getAs[Vector](index).size) -Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName(c + "_" + i)) +(0 until length).map { i => NumericAttribute.defaultAttr.withName(c + "_" + i) } + } +case DoubleType => + val attribute = Attribute.fromStructField(field) + attribute match { +case UnresolvedAttribute => + Seq(NumericAttribute.defaultAttr.withName(c)) +case _ => + Seq(attribute.withName(c)) } +case _ : NumericType | BooleanType => + // If the input column type is a compatible scalar type, assume numeric. + Seq(NumericAttribute.defaultAttr.withName(c)) case otherType => throw new SparkException(s"VectorAssembler does not support the $otherType type") } } -val metadata = new AttributeGroup($(outputCol), attrs).toMetadata() - +val featureAttributes = featureAttributesMap.flatten[Attribute] +val lengths = featureAttributesMap.map(a => a.length) +val metadata = new AttributeGroup($(outputCol), featureAttributes.toArray).toMetadata() +val (filteredDataset, keepInvalid) = $(handleInvalid) match { + case StringIndexer.SKIP_INVALID => (dataset.na.drop("any", $(inputCols)), false) --- End diff -- you can directly use `dataset.na.drop($(inputCols))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174991898 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("1.6.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). + * Default: "error" + * @group param + */ + @Since("1.6.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +"Hhow to handle invalid data (NULL values). Options are 'skip' (filter out rows with " + + "invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN " + + "in the * output).", ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map { c => val field = schema(c) - val index = schema.fieldIndex(c) field.dataType match { -case DoubleType => - val attr = Attribute.fromStructField(field) - // If the input column doesn't have ML attribute, assume numeric. - if (attr == UnresolvedAttribute) { -Some(NumericAttribute.defaultAttr.withName(c)) - } else { -Some(attr.withName(c)) - } -case _: NumericType | BooleanType => - // If the input column type is a compatible scalar type, assume numeric. - Some(NumericAttribute.defaultAttr.withName(c)) case _: VectorUDT => - val group = AttributeGroup.fromStructField(field) - if (group.attributes.isDefined) { -// If attributes are defined, copy them with updated names. -group.attributes.get.zipWithIndex.map { case (attr, i) => + val attributeGroup = AttributeGroup.fromStructField(field) + var length = attributeGroup.size + val isMissingNumAttrs = -1 == length + if (isMissingNumAttrs && dataset.isStreaming) { +// this condition is checked for every column, but should be cheap +throw new RuntimeException( + s""" + |VectorAssembler cannot dynamically determine the size of vectors for streaming + |data. Consider applying VectorSizeHint to ${c} so that this transformer can be + |used to transform streaming inputs. + """.stripMargin.replaceAll("\n", " ")) + } + if (isMissingNumAttrs) { +val column = dataset.select(c).na.drop() +// column count is a spark job for every column missing num attrs +length = (column.count() > 0, $(handleInvalid)) match { + // column first is the second spark job for every column missing num attrs + case (true, _) => column.first.getAs[Vector](0).size + case (false, VectorAssembler.SKIP_INVALID | VectorAssembler.ERROR_INVALID) => 0 + case (false, _) => +throw new RuntimeException( + s""" + |VectorAssembler cannot determine the size of empty vectors. Consider applying + |VectorSizeHint to ${c} so that this transformer can be used to transform empty + |columns. + """.stripMargin.replaceAll("\n", " ")) --- End diff -- I think in this case, `VectorSizeHint` also cannot help to providing the vector size. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174990323 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("1.6.0") --- End diff -- `@Since("2.4.0")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174994214 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("1.6.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). + * Default: "error" + * @group param + */ + @Since("1.6.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +"Hhow to handle invalid data (NULL values). Options are 'skip' (filter out rows with " + + "invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN " + + "in the * output).", ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map { c => val field = schema(c) - val index = schema.fieldIndex(c) field.dataType match { -case DoubleType => - val attr = Attribute.fromStructField(field) - // If the input column doesn't have ML attribute, assume numeric. - if (attr == UnresolvedAttribute) { -Some(NumericAttribute.defaultAttr.withName(c)) - } else { -Some(attr.withName(c)) - } -case _: NumericType | BooleanType => - // If the input column type is a compatible scalar type, assume numeric. - Some(NumericAttribute.defaultAttr.withName(c)) case _: VectorUDT => - val group = AttributeGroup.fromStructField(field) - if (group.attributes.isDefined) { -// If attributes are defined, copy them with updated names. -group.attributes.get.zipWithIndex.map { case (attr, i) => + val attributeGroup = AttributeGroup.fromStructField(field) + var length = attributeGroup.size + val isMissingNumAttrs = -1 == length + if (isMissingNumAttrs && dataset.isStreaming) { +// this condition is checked for every column, but should be cheap +throw new RuntimeException( + s""" + |VectorAssembler cannot dynamically determine the size of vectors for streaming + |data. Consider applying VectorSizeHint to ${c} so that this transformer can be + |used to transform streaming inputs. + """.stripMargin.replaceAll("\n", " ")) + } + if (isMissingNumAttrs) { +val column = dataset.select(c).na.drop() --- End diff -- * The var name `column` isn't good. `colDataset` is better. * An optional optimization is one-pass scanning the dataset and count non-null rows for each "missing num attrs" columns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174990221 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("1.6.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). + * Default: "error" + * @group param + */ + @Since("1.6.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +"Hhow to handle invalid data (NULL values). Options are 'skip' (filter out rows with " + --- End diff -- HHow -> How --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174980520 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -234,7 +234,7 @@ class StringIndexerModel ( val metadata = NominalAttribute.defaultAttr .withName($(outputCol)).withValues(filteredLabels).toMetadata() // If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = getHandleInvalid match { --- End diff -- ok. it doesn't matter no need separate PR I think. just a minor change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174941546 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -234,7 +234,7 @@ class StringIndexerModel ( val metadata = NominalAttribute.defaultAttr .withName($(outputCol)).withValues(filteredLabels).toMetadata() // If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = getHandleInvalid match { --- End diff -- Thanks for picking this out! I changed this because I was matching on `$(handleInvalid)` in VectorAssembler and that seems to be the recommended way of doing this. Should I include this in the current PR and add a note or open a separate PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174675028 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -234,7 +234,7 @@ class StringIndexerModel ( val metadata = NominalAttribute.defaultAttr .withName($(outputCol)).withValues(filteredLabels).toMetadata() // If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = getHandleInvalid match { --- End diff -- Why need change this line ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org