[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20829


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178676396
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

Oh, I see; you're right.  Does this mean the latest change removes the 
space though?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178636550
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

"\n" should be replaced with " " and then a message. Unless I misunderstand 
something.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178620282
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getVectorLengthsFromFirstRow(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), 
missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+s"""Can not infer column lengths for 'keep invalid' mode. Consider 
using
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178620200
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178620104
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178620142
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178619977
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
--- End diff --

Ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178619893
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
--- End diff --

This still isn't correct; look at other MLlib code for examples.  It should 
be:
```
private[feature] def getVectorLengthsFromFirstRow(
dataset: Dataset[_],
columns: Seq[String]): Map[String, Int] = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178605922
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+   * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+   * from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.4.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"""
+| Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+| invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+| output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+| from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+| """.stripMargin.replaceAll("\n", " "),
+ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val vectorCols = $(inputCols).toSeq.filter { c =>
+  schema(c).dataType match {
+case _: VectorUDT => true
+case _ => false
+  }
+}
+val vectorColsLengths = VectorAssembler.getLengths(dataset, 
vectorCols, $(handleInvalid))
+
+val featureAttributesMap = $(inputCols).toSeq.map { c =>
--- End diff --

We need the map to find out the length of vectors, unless there's a way to 
do this in one mapping way, I think it might be better than to call first a 
`map` and then a `flatMap`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177542325
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
--- End diff --

scala style: For multiline class and method headers, put the first argument 
on the next line, with +4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177543971
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
--- End diff --

nit: In Scala, use camelCase: first_sizes -> firstSizes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177500915
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -17,26 +17,32 @@
 
 package org.apache.spark.ml.feature
 
+import java.util.NoSuchElementException
+
 import scala.collection.mutable.ArrayBuilder
+import scala.language.existentials
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
 import org.apache.spark.ml.Transformer
 import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, 
NumericAttribute, UnresolvedAttribute}
 import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
-import org.apache.spark.ml.param.ParamMap
+import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
 /**
- * A feature transformer that merges multiple columns into a vector column.
+ * A feature transformer that merges multiple columns into a vector 
column. This requires one pass
--- End diff --

style nit: Move new text here into a new paragraph below.  That will give 
nicer "pyramid-style" formatting with essential info separated from details.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177542280
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
--- End diff --

scala style: For multiline class and method headers, put the first argument 
on the next line, with +4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177505970
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+   * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+   * from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.4.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"""
+| Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+| invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+| output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+| from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+| """.stripMargin.replaceAll("\n", " "),
+ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val vectorCols = $(inputCols).toSeq.filter { c =>
+  schema(c).dataType match {
+case _: VectorUDT => true
+case _ => false
+  }
+}
+val vectorColsLengths = VectorAssembler.getLengths(dataset, 
vectorCols, $(handleInvalid))
+
+val featureAttributesMap = $(inputCols).toSeq.map { c =>
   val field = schema(c)
-  val index = schema.fieldIndex(c)
   field.dataType match {
 case DoubleType =>
-  val attr = Attribute.fromStructField(field)
-  // If the input column doesn't have ML attribute, assume numeric.
-  if (attr == UnresolvedAttribute) {
-Some(NumericAttribute.defaultAttr.withName(c))
-  } else {
-Some(attr.withName(c))
+  val attribute = Attribute.fromStructField(field)
+  attribute match {
+case UnresolvedAttribute =>
+  Seq(NumericAttribute.defaultAttr.withName(c))
+case _ =>
+  Seq(attribute.withName(c))
   }
 case _: NumericType | BooleanType =>
   // If the input column type is a compatible scalar type, assume 
numeric.
-  Some(NumericAttribute.defaultAttr.withName(c))
+  Seq(NumericAttribute.defaultAttr.withName(c))
 case _: VectorUDT =>
-  val group = AttributeGroup.fromStructField(field)
-  if (group.attributes.isDefined) {
-// If attributes are defined, copy them with updated names.
-group.attributes.get.zipWithIndex.map { case (attr, i) =>
+  val attributeGroup = AttributeGroup.fromStructField(field)
--- End diff --

for the future, I'd avoid renaming things like this unless it's really 
unclear or needed (to make diffs shorter)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177503206
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+   * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+   * from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.4.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"""
+| Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+| invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+| output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+| from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+| """.stripMargin.replaceAll("\n", " "),
+ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val vectorCols = $(inputCols).toSeq.filter { c =>
--- End diff --

nit: Is toSeq extraneous?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177501836
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
--- End diff --

I'd recommend we deal with NaNs now.  This PR is already dealing with some 
NaN cases: Dataset.na.drop handles NaNs in NumericType columns (but not 
VectorUDT columns).

I'm Ok with postponing incorrect vector lengths until later or doing that 
now since that work will be more separate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177547373
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getVectorLengthsFromFirstRow(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), 
missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+s"""Can not infer column lengths for 'keep invalid' mode. Consider 
using
--- End diff --

nit: Refer to specific Param value: "Cannot infer column lengths for mode 
handleInvalid = "keep""


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177543316
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177543289
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

nit: Put space in between this message and e.toString


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177559339
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +159,88 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls when keepInvalid is true") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors when keepInvalid is false") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Array(1, 1), false)(1.0, null))
+intercept[SparkException](assemble(Array(1, 2), false)(1.0, null))
+intercept[SparkException](assemble(Array(1), false)(null))
+intercept[SparkException](assemble(Array(2), false)(null))
+  }
+
+  test("get lengths functions") {
+import org.apache.spark.ml.feature.VectorAssembler._
+val df = dfWithNulls
+assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2))
+
assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"),
 Seq("y")))
+  .getMessage.contains("VectorSizeHint"))
+
assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1
 > 4"),
+  Seq("y"))).getMessage.contains("VectorSizeHint"))
+
+assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == 
"y" -> 2))
+assert(intercept[NullPointerException](getLengths(df.sort("id2"), 
Seq("y"), ERROR_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+assert(intercept[RuntimeException](getLengths(df.sort("id2"), 
Seq("y"), KEEP_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+  }
+
+  test("Handle Invalid should behave properly") {
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+def run_with_metadata(mode: String, additional_filter: String = 
"true"): Dataset[_] = {
--- End diff --

style: use camelCase


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177559587
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +159,88 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls when keepInvalid is true") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors when keepInvalid is false") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Array(1, 1), false)(1.0, null))
+intercept[SparkException](assemble(Array(1, 2), false)(1.0, null))
+intercept[SparkException](assemble(Array(1), false)(null))
+intercept[SparkException](assemble(Array(2), false)(null))
+  }
+
+  test("get lengths functions") {
+import org.apache.spark.ml.feature.VectorAssembler._
+val df = dfWithNulls
+assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2))
+
assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"),
 Seq("y")))
+  .getMessage.contains("VectorSizeHint"))
+
assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1
 > 4"),
+  Seq("y"))).getMessage.contains("VectorSizeHint"))
+
+assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == 
"y" -> 2))
+assert(intercept[NullPointerException](getLengths(df.sort("id2"), 
Seq("y"), ERROR_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+assert(intercept[RuntimeException](getLengths(df.sort("id2"), 
Seq("y"), KEEP_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+  }
+
+  test("Handle Invalid should behave properly") {
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+def run_with_metadata(mode: String, additional_filter: String = 
"true"): Dataset[_] = {
+  val attributeY = new AttributeGroup("y", 2)
+  val subAttributesOfZ = Array(NumericAttribute.defaultAttr, 
NumericAttribute.defaultAttr)
--- End diff --

unused


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177547735
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getVectorLengthsFromFirstRow(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getVectorLengthsFromFirstRow(dataset.na.drop(missing_columns), 
missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+s"""Can not infer column lengths for 'keep invalid' mode. Consider 
using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " "))
+  case (_, _) => Map.empty
+}
+group_sizes ++ first_sizes
+  }
+
+
   @Since("1.6.0")
   override def load(path: String): VectorAssembler = super.load(path)
 
-  private[feature] def assemble(vv: Any*): Vector = {
+  /**
+   * Returns a UDF that has the required information to assemble each row.
--- End diff --

nit: When people say "UDF," they generally mean a Spark SQL UDF.  This is 
just a function, not a SQL UDF.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177504904
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+   * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+   * from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.4.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"""
+| Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+| invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+| output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+| from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+| """.stripMargin.replaceAll("\n", " "),
+ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val vectorCols = $(inputCols).toSeq.filter { c =>
+  schema(c).dataType match {
+case _: VectorUDT => true
+case _ => false
+  }
+}
+val vectorColsLengths = VectorAssembler.getLengths(dataset, 
vectorCols, $(handleInvalid))
+
+val featureAttributesMap = $(inputCols).toSeq.map { c =>
--- End diff --

I think the flatMap is simpler, or at least a more common pattern in Spark 
and Scala (rather than having nested sequences which are then flattened).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177558064
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -18,56 +18,68 @@
 package org.apache.spark.ml.feature
 
 import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.ml.attribute.{AttributeGroup, NominalAttribute, 
NumericAttribute}
+import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, 
NominalAttribute, NumericAttribute}
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, 
Vectors}
 import org.apache.spark.ml.param.ParamsSuite
 import org.apache.spark.ml.util.DefaultReadWriteTest
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.functions.{col, udf}
 
 class VectorAssemblerSuite
   extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
 
   import testImplicits._
 
+  @transient var dfWithNulls: Dataset[_] = _
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+dfWithNulls = Seq[(Long, Long, java.lang.Double, Vector, String, 
Vector, Long, String)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L, null),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L, 
null),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L, null),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L, 
null))
+  .toDF("id1", "id2", "x", "y", "name", "z", "n", "nulls")
+  }
+
   test("params") {
 ParamsSuite.checkParams(new VectorAssembler)
   }
 
   test("assemble") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty))
-assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0)))
+assert(assemble(Array(1), true)(0.0) === Vectors.sparse(1, 
Array.empty, Array.empty))
+assert(assemble(Array(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, 
Array(1), Array(1.0)))
 val dv = Vectors.dense(2.0, 0.0)
-assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), 
Array(2.0, 1.0)))
+assert(assemble(Array(1, 2, 1), true)(0.0, dv, 1.0) ===
+  Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0)))
 val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0))
-assert(assemble(0.0, dv, 1.0, sv) ===
+assert(assemble(Array(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) ===
   Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0)))
-for (v <- Seq(1, "a", null)) {
-  intercept[SparkException](assemble(v))
-  intercept[SparkException](assemble(1.0, v))
+for (v <- Seq(1, "a")) {
+  intercept[SparkException](assemble(Array(1), true)(v))
+  intercept[SparkException](assemble(Array(1, 1), true)(1.0, v))
 }
   }
 
   test("assemble should compress vectors") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0))
+val v1 = assemble(Array(1, 1, 1, 1), true)(0.0, 0.0, 0.0, 
Vectors.dense(4.0))
 assert(v1.isInstanceOf[SparseVector])
-val v2 = assemble(1.0, 2.0, 3.0, Vectors.sparse(1, Array(0), 
Array(4.0)))
+val sv = Vectors.sparse(1, Array(0), Array(4.0))
+val v2 = assemble(Array(1, 1, 1, 1), true)(1.0, 2.0, 3.0, sv)
 assert(v2.isInstanceOf[DenseVector])
   }
 
   test("VectorAssembler") {
-val df = Seq(
-  (0, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), 
Array(3.0)), 10L)
-).toDF("id", "x", "y", "name", "z", "n")
+val df = dfWithNulls.filter("id1 == 1").withColumn("id", col("id1"))
--- End diff --

nit: If this is for consolidation, I'm actually against this little change 
since it obscures what this test is doing and moves the input Row farther from 
the expected output row.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177543627
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+s"""Encountered empty dataframe while inferring lengths from the 
first row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
--- End diff --

style: state return value explicitly  (This isn't completely consistent in 
Spark, but we try to in MLlib)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-27 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r177560225
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +159,88 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls when keepInvalid is true") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Array(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Array(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Array(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Array(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors when keepInvalid is false") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Array(1, 1), false)(1.0, null))
+intercept[SparkException](assemble(Array(1, 2), false)(1.0, null))
+intercept[SparkException](assemble(Array(1), false)(null))
+intercept[SparkException](assemble(Array(2), false)(null))
+  }
+
+  test("get lengths functions") {
+import org.apache.spark.ml.feature.VectorAssembler._
+val df = dfWithNulls
+assert(getVectorLengthsFromFirstRow(df, Seq("y")) === Map("y" -> 2))
+
assert(intercept[NullPointerException](getVectorLengthsFromFirstRow(df.sort("id2"),
 Seq("y")))
+  .getMessage.contains("VectorSizeHint"))
+
assert(intercept[NoSuchElementException](getVectorLengthsFromFirstRow(df.filter("id1
 > 4"),
+  Seq("y"))).getMessage.contains("VectorSizeHint"))
+
+assert(getLengths(df.sort("id2"), Seq("y"), SKIP_INVALID).exists(_ == 
"y" -> 2))
+assert(intercept[NullPointerException](getLengths(df.sort("id2"), 
Seq("y"), ERROR_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+assert(intercept[RuntimeException](getLengths(df.sort("id2"), 
Seq("y"), KEEP_INVALID))
+  .getMessage.contains("VectorSizeHint"))
+  }
+
+  test("Handle Invalid should behave properly") {
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+def run_with_metadata(mode: String, additional_filter: String = 
"true"): Dataset[_] = {
+  val attributeY = new AttributeGroup("y", 2)
+  val subAttributesOfZ = Array(NumericAttribute.defaultAttr, 
NumericAttribute.defaultAttr)
+  val attributeZ = new AttributeGroup(
+"z",
+Array[Attribute](
+  NumericAttribute.defaultAttr.withName("foo"),
+  NumericAttribute.defaultAttr.withName("bar")))
+  val dfWithMetadata = dfWithNulls.withColumn("y", col("y"), 
attributeY.toMetadata())
+.withColumn("z", col("z"), 
attributeZ.toMetadata()).filter(additional_filter)
+  val output = 
assembler.setHandleInvalid(mode).transform(dfWithMetadata)
+  output.collect()
+  output
+}
+def run_with_first_row(mode: String): Dataset[_] = {
--- End diff --

style: Put empty line between functions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176285294
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
--- End diff --

also, we just deal with nulls here. NaNs and incorrect length vectors are 
transmitted transparently. Do we need to test for those?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176280827
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
+
intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect())
+
+// numeric column is all null
--- End diff --

was testing extraction of metadata for numeric column (is always 1). Not 
relevant in new framework.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176267223
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
--- End diff --

it fails because vector size hint is not given, adding a section with 
VectorSizeHInts


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176265864
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
--- End diff --

updated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176266756
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

thanks, good idea! this helped me in catching the `drop.na()` bug that 
might drop everything


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176266213
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

to allow nulls in the column :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176245770
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
--- End diff --

Thanks! We do throw some descriptive error 
[here](https://github.com/apache/spark/pull/20829/files#diff-9c84e1d27f25714e256cb482069359cfR193),
 added more description to it and made assertions in test on those messages.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176228007
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
--- End diff --

Behavior of options already included, explanation of column length included 
here, run time information included in the VectorAssembler class's 
documentation. Thanks for the suggestion, this is super important!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176220684
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -37,24 +37,26 @@ class VectorAssemblerSuite
 
   test("assemble") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty))
-assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0)))
+assert(assemble(Seq(1), true)(0.0) === Vectors.sparse(1, Array.empty, 
Array.empty))
+assert(assemble(Seq(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, 
Array(1), Array(1.0)))
 val dv = Vectors.dense(2.0, 0.0)
-assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), 
Array(2.0, 1.0)))
+assert(assemble(Seq(1, 2, 1), true)(0.0, dv, 1.0) ===
+  Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0)))
 val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0))
-assert(assemble(0.0, dv, 1.0, sv) ===
+assert(assemble(Seq(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) ===
   Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0)))
-for (v <- Seq(1, "a", null)) {
-  intercept[SparkException](assemble(v))
-  intercept[SparkException](assemble(1.0, v))
+for (v <- Seq(1, "a")) {
+  intercept[SparkException](assemble(Seq(1), true)(v))
+  intercept[SparkException](assemble(Seq(1, 1), true)(1.0, v))
 }
   }
 
   test("assemble should compress vectors") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0))
+val v1 = assemble(Seq(1, 1, 1, 4), true)(0.0, 0.0, 0.0, 
Vectors.dense(4.0))
--- End diff --

that's a typo, Thanks for pointing it out! that number is not used in case 
we do not have nulls, which is why the test passes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175908732
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
--- End diff --

make more explicit: + " when keepInvalid = true"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175907866
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -37,24 +37,26 @@ class VectorAssemblerSuite
 
   test("assemble") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty))
-assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0)))
+assert(assemble(Seq(1), true)(0.0) === Vectors.sparse(1, Array.empty, 
Array.empty))
+assert(assemble(Seq(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, 
Array(1), Array(1.0)))
 val dv = Vectors.dense(2.0, 0.0)
-assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), 
Array(2.0, 1.0)))
+assert(assemble(Seq(1, 2, 1), true)(0.0, dv, 1.0) ===
+  Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0)))
 val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0))
-assert(assemble(0.0, dv, 1.0, sv) ===
+assert(assemble(Seq(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) ===
   Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0)))
-for (v <- Seq(1, "a", null)) {
-  intercept[SparkException](assemble(v))
-  intercept[SparkException](assemble(1.0, v))
+for (v <- Seq(1, "a")) {
+  intercept[SparkException](assemble(Seq(1), true)(v))
+  intercept[SparkException](assemble(Seq(1, 1), true)(1.0, v))
 }
   }
 
   test("assemble should compress vectors") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0))
+val v1 = assemble(Seq(1, 1, 1, 4), true)(0.0, 0.0, 0.0, 
Vectors.dense(4.0))
--- End diff --

We probably want this to fail, right?  It expects a Vector of length 4 but 
is given a Vector of length 1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175904796
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -234,7 +234,7 @@ class StringIndexerModel (
 val metadata = NominalAttribute.defaultAttr
   .withName($(outputCol)).withValues(filteredLabels).toMetadata()
 // If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = getHandleInvalid match {
--- End diff --

For the record, in general, I would not bother making changes like this.  
The one exception I do make is IntelliJ style complaints since those can be 
annoying for developers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175913440
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
--- End diff --

This is great that you're testing this carefully, but I recommend we make 
sure to pass better exceptions to users.  E.g., they won't know what to do with 
a NullPointerException, so we could instead tell them something like: "Column x 
in the first row of the dataset has a null entry, but VectorAssembler expected 
a non-null entry.  This can be fixed by explicitly specifying the expected size 
using VectorSizeHint."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175912462
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
--- End diff --

I think you can do a direct comparison:
```
assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")) === Map("y" -> 
2))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175910846
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
--- End diff --

Also, it's unclear what this method does until I read the code.  Possibly 
make the method name more explicit ("getVectorLengthsFromFirstRow"), and 
definitely document that "columns" must all be of Vector type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175909539
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+"Saw null value on the first row: " + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+"Cannot infer vector size from all empty DataFrame" + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getLengthsFromFirst(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getLengthsFromFirst(dataset.na.drop, missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+"Consider using VectorSizeHint for columns: " + 
missing_columns.mkString("[", ",", "]"))
+  case (_, _) => Map.empty
+}
+group_sizes ++ first_sizes
+  }
+
+
   @Since("1.6.0")
   override def load(path: String): VectorAssembler = super.load(path)
 
-  private[feature] def assemble(vv: Any*): Vector = {
+  private[feature] def assemble(lengths: Seq[Int], keepInvalid: 
Boolean)(vv: Any*): Vector = {
--- End diff --

Also, I'd add doc explaining requirements, especially that this assumes 
that lengths and vv have the same length.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175910402
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
--- End diff --

first -> first()


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175908930
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
--- End diff --

No need to compare with anything; just call assemble()


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175914154
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

Also, if there are "trash" columns not used by VectorAssembler, maybe name 
them as such and add a few null values in them for better testing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175913755
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

Since this is shared across multiple tests, just make it a shared value.  
See e.g. 
https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala#L55


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175915187
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
+
intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect())
+
+// numeric column is all null
--- End diff --

Did you want to test:
* extraction of metadata from the first row (which is what this is testing, 
I believe), or
* transformation on an all-null column (which this never reaches)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175908248
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
--- End diff --

It would be good to expand this doc to explain the behavior: how various 
types of invalid values are treated (null, NaN, incorrect Vector length) and 
how computationally expensive different options can be.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175914695
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
--- End diff --

Should this fail?  I thought it should pad with NaNs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175915257
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
+
intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect())
+
+// numeric column is all null
+intercept[RuntimeException](
+  
assembler.setHandleInvalid("keep").transform(df.filter("id1==3")).count() == 1)
+
+// vector column is all null
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175909880
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

Why java.lang.Double instead of Double?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175911314
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+"Saw null value on the first row: " + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+"Cannot infer vector size from all empty DataFrame" + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getLengthsFromFirst(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getLengthsFromFirst(dataset.na.drop, missing_columns)
--- End diff --

This will drop Rows with NA values in extraneous columns.  I.e., even if 
the VectorAssembler is only assembling columns A and B, if there is a NAN in 
column C, this will drop that row.  Pass the list of columns you care about to 
the drop() method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175910352
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
--- End diff --

Fix (new) IntelliJ style warnings: "toDF" -> "toDF()"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175910188
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
--- End diff --

scala style: For multiline class and method headers, put the first argument 
on the next line, with +4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175906193
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+"Saw null value on the first row: " + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+"Cannot infer vector size from all empty DataFrame" + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getLengthsFromFirst(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getLengthsFromFirst(dataset.na.drop, missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+"Consider using VectorSizeHint for columns: " + 
missing_columns.mkString("[", ",", "]"))
+  case (_, _) => Map.empty
+}
+group_sizes ++ first_sizes
+  }
+
+
   @Since("1.6.0")
   override def load(path: String): VectorAssembler = super.load(path)
 
-  private[feature] def assemble(vv: Any*): Vector = {
+  private[feature] def assemble(lengths: Seq[Int], keepInvalid: 
Boolean)(vv: Any*): Vector = {
--- End diff --

nit: Use Array[Int] for faster access


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175908774
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
--- End diff --

similarly: make more explicit: + " when keepInvalid = false"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-16 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175153265
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("1.6.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output).
+   * Default: "error"
+   * @group param
+   */
+  @Since("1.6.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"Hhow to handle invalid data (NULL values). Options are 'skip' (filter 
out rows with " +
+  "invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN " +
+  "in the * output).", 
ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map 
{ c =>
   val field = schema(c)
-  val index = schema.fieldIndex(c)
   field.dataType match {
-case DoubleType =>
-  val attr = Attribute.fromStructField(field)
-  // If the input column doesn't have ML attribute, assume numeric.
-  if (attr == UnresolvedAttribute) {
-Some(NumericAttribute.defaultAttr.withName(c))
-  } else {
-Some(attr.withName(c))
-  }
-case _: NumericType | BooleanType =>
-  // If the input column type is a compatible scalar type, assume 
numeric.
-  Some(NumericAttribute.defaultAttr.withName(c))
 case _: VectorUDT =>
-  val group = AttributeGroup.fromStructField(field)
-  if (group.attributes.isDefined) {
-// If attributes are defined, copy them with updated names.
-group.attributes.get.zipWithIndex.map { case (attr, i) =>
+  val attributeGroup = AttributeGroup.fromStructField(field)
+  var length = attributeGroup.size
+  val isMissingNumAttrs = -1 == length
+  if (isMissingNumAttrs && dataset.isStreaming) {
+// this condition is checked for every column, but should be 
cheap
+throw new RuntimeException(
+  s"""
+ |VectorAssembler cannot dynamically determine the size of 
vectors for streaming
+ |data. Consider applying VectorSizeHint to ${c} so that 
this transformer can be
+ |used to transform streaming inputs.
+   """.stripMargin.replaceAll("\n", " "))
+  }
+  if (isMissingNumAttrs) {
+val column = dataset.select(c).na.drop()
--- End diff --

Good catch! That name was bothering me too :P
@MrBago  and I are thinking of another way to do this more efficiently.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-16 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175152022
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -85,18 +120,34 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   } else {
 // Otherwise, treat all attributes as numeric. If we cannot 
get the number of attributes
 // from metadata, check the first row.
-val numAttrs = 
group.numAttributes.getOrElse(first.getAs[Vector](index).size)
-Array.tabulate(numAttrs)(i => 
NumericAttribute.defaultAttr.withName(c + "_" + i))
+(0 until length).map { i => 
NumericAttribute.defaultAttr.withName(c + "_" + i) }
+  }
+case DoubleType =>
+  val attribute = Attribute.fromStructField(field)
+  attribute match {
+case UnresolvedAttribute =>
+  Seq(NumericAttribute.defaultAttr.withName(c))
+case _ =>
+  Seq(attribute.withName(c))
   }
+case _ : NumericType | BooleanType =>
+  // If the input column type is a compatible scalar type, assume 
numeric.
+  Seq(NumericAttribute.defaultAttr.withName(c))
 case otherType =>
   throw new SparkException(s"VectorAssembler does not support the 
$otherType type")
   }
 }
-val metadata = new AttributeGroup($(outputCol), attrs).toMetadata()
-
+val featureAttributes = featureAttributesMap.flatten[Attribute]
+val lengths = featureAttributesMap.map(a => a.length)
+val metadata = new AttributeGroup($(outputCol), 
featureAttributes.toArray).toMetadata()
+val (filteredDataset, keepInvalid) = $(handleInvalid) match {
+  case StringIndexer.SKIP_INVALID => (dataset.na.drop("any", 
$(inputCols)), false)
--- End diff --

Ah, good point! Although I do think that keeping "any" might make it easier 
to read, but that may not necessarily hold for experienced people :P



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174990264
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("1.6.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output).
+   * Default: "error"
+   * @group param
+   */
+  @Since("1.6.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"Hhow to handle invalid data (NULL values). Options are 'skip' (filter 
out rows with " +
+  "invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN " +
+  "in the * output).", 
ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
--- End diff --

"in the * output" -> "in the output"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174993897
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -85,18 +120,34 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   } else {
 // Otherwise, treat all attributes as numeric. If we cannot 
get the number of attributes
 // from metadata, check the first row.
-val numAttrs = 
group.numAttributes.getOrElse(first.getAs[Vector](index).size)
-Array.tabulate(numAttrs)(i => 
NumericAttribute.defaultAttr.withName(c + "_" + i))
+(0 until length).map { i => 
NumericAttribute.defaultAttr.withName(c + "_" + i) }
+  }
+case DoubleType =>
+  val attribute = Attribute.fromStructField(field)
+  attribute match {
+case UnresolvedAttribute =>
+  Seq(NumericAttribute.defaultAttr.withName(c))
+case _ =>
+  Seq(attribute.withName(c))
   }
+case _ : NumericType | BooleanType =>
+  // If the input column type is a compatible scalar type, assume 
numeric.
+  Seq(NumericAttribute.defaultAttr.withName(c))
 case otherType =>
   throw new SparkException(s"VectorAssembler does not support the 
$otherType type")
   }
 }
-val metadata = new AttributeGroup($(outputCol), attrs).toMetadata()
-
+val featureAttributes = featureAttributesMap.flatten[Attribute]
+val lengths = featureAttributesMap.map(a => a.length)
+val metadata = new AttributeGroup($(outputCol), 
featureAttributes.toArray).toMetadata()
+val (filteredDataset, keepInvalid) = $(handleInvalid) match {
+  case StringIndexer.SKIP_INVALID => (dataset.na.drop("any", 
$(inputCols)), false)
--- End diff --

you can directly use `dataset.na.drop($(inputCols))`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174991898
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("1.6.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output).
+   * Default: "error"
+   * @group param
+   */
+  @Since("1.6.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"Hhow to handle invalid data (NULL values). Options are 'skip' (filter 
out rows with " +
+  "invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN " +
+  "in the * output).", 
ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map 
{ c =>
   val field = schema(c)
-  val index = schema.fieldIndex(c)
   field.dataType match {
-case DoubleType =>
-  val attr = Attribute.fromStructField(field)
-  // If the input column doesn't have ML attribute, assume numeric.
-  if (attr == UnresolvedAttribute) {
-Some(NumericAttribute.defaultAttr.withName(c))
-  } else {
-Some(attr.withName(c))
-  }
-case _: NumericType | BooleanType =>
-  // If the input column type is a compatible scalar type, assume 
numeric.
-  Some(NumericAttribute.defaultAttr.withName(c))
 case _: VectorUDT =>
-  val group = AttributeGroup.fromStructField(field)
-  if (group.attributes.isDefined) {
-// If attributes are defined, copy them with updated names.
-group.attributes.get.zipWithIndex.map { case (attr, i) =>
+  val attributeGroup = AttributeGroup.fromStructField(field)
+  var length = attributeGroup.size
+  val isMissingNumAttrs = -1 == length
+  if (isMissingNumAttrs && dataset.isStreaming) {
+// this condition is checked for every column, but should be 
cheap
+throw new RuntimeException(
+  s"""
+ |VectorAssembler cannot dynamically determine the size of 
vectors for streaming
+ |data. Consider applying VectorSizeHint to ${c} so that 
this transformer can be
+ |used to transform streaming inputs.
+   """.stripMargin.replaceAll("\n", " "))
+  }
+  if (isMissingNumAttrs) {
+val column = dataset.select(c).na.drop()
+// column count is a spark job for every column missing num 
attrs
+length = (column.count() > 0, $(handleInvalid)) match {
+  // column first is the second spark job for every column 
missing num attrs
+  case (true, _) => column.first.getAs[Vector](0).size
+  case (false, VectorAssembler.SKIP_INVALID | 
VectorAssembler.ERROR_INVALID) => 0
+  case (false, _) =>
+throw new RuntimeException(
+  s"""
+ |VectorAssembler cannot determine the size of empty 
vectors. Consider applying
+ |VectorSizeHint to ${c} so that this transformer can 
be used to transform empty
+ |columns.
+   """.stripMargin.replaceAll("\n", " "))
--- End diff --

I think in this case, `VectorSizeHint` also cannot help to providing the 
vector size.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174990323
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("1.6.0")
--- End diff --

`@Since("2.4.0")`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174994214
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("1.6.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output).
+   * Default: "error"
+   * @group param
+   */
+  @Since("1.6.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"Hhow to handle invalid data (NULL values). Options are 'skip' (filter 
out rows with " +
+  "invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN " +
+  "in the * output).", 
ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map 
{ c =>
   val field = schema(c)
-  val index = schema.fieldIndex(c)
   field.dataType match {
-case DoubleType =>
-  val attr = Attribute.fromStructField(field)
-  // If the input column doesn't have ML attribute, assume numeric.
-  if (attr == UnresolvedAttribute) {
-Some(NumericAttribute.defaultAttr.withName(c))
-  } else {
-Some(attr.withName(c))
-  }
-case _: NumericType | BooleanType =>
-  // If the input column type is a compatible scalar type, assume 
numeric.
-  Some(NumericAttribute.defaultAttr.withName(c))
 case _: VectorUDT =>
-  val group = AttributeGroup.fromStructField(field)
-  if (group.attributes.isDefined) {
-// If attributes are defined, copy them with updated names.
-group.attributes.get.zipWithIndex.map { case (attr, i) =>
+  val attributeGroup = AttributeGroup.fromStructField(field)
+  var length = attributeGroup.size
+  val isMissingNumAttrs = -1 == length
+  if (isMissingNumAttrs && dataset.isStreaming) {
+// this condition is checked for every column, but should be 
cheap
+throw new RuntimeException(
+  s"""
+ |VectorAssembler cannot dynamically determine the size of 
vectors for streaming
+ |data. Consider applying VectorSizeHint to ${c} so that 
this transformer can be
+ |used to transform streaming inputs.
+   """.stripMargin.replaceAll("\n", " "))
+  }
+  if (isMissingNumAttrs) {
+val column = dataset.select(c).na.drop()
--- End diff --

* The var name `column` isn't good. `colDataset` is better.

* An optional optimization is one-pass scanning the dataset and count 
non-null rows for each "missing num attrs" columns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174990221
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("1.6.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output).
+   * Default: "error"
+   * @group param
+   */
+  @Since("1.6.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"Hhow to handle invalid data (NULL values). Options are 'skip' (filter 
out rows with " +
--- End diff --

HHow -> How


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174980520
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -234,7 +234,7 @@ class StringIndexerModel (
 val metadata = NominalAttribute.defaultAttr
   .withName($(outputCol)).withValues(filteredLabels).toMetadata()
 // If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = getHandleInvalid match {
--- End diff --

ok. it doesn't matter no need separate PR I think. just a minor change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174941546
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -234,7 +234,7 @@ class StringIndexerModel (
 val metadata = NominalAttribute.defaultAttr
   .withName($(outputCol)).withValues(filteredLabels).toMetadata()
 // If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = getHandleInvalid match {
--- End diff --

Thanks for picking this out! I changed this because I was matching on 
`$(handleInvalid)` in VectorAssembler and that seems to be the recommended way 
of doing this. Should I include this in the current PR and add a note or open a 
separate PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-14 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174675028
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -234,7 +234,7 @@ class StringIndexerModel (
 val metadata = NominalAttribute.defaultAttr
   .withName($(outputCol)).withValues(filteredLabels).toMetadata()
 // If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = getHandleInvalid match {
--- End diff --

Why need change this line ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org