[GitHub] spark issue #22912: [SPARK-25901][CORE] Use only one thread in BarrierTaskCo...

2018-10-31 Thread yogeshg
Github user yogeshg commented on the issue:

https://github.com/apache/spark/pull/22912
  
In an offline discussion with @MrBago , we noted that there's at most as 
many (non-cancelled) `timerTasks` on the `timer` as there are slots. So, one 
thread for managing logging is probably fine, in fact if anything, we should 
also think about how we can just use the main thread. Also, this means that the 
`timer.purge()` call in the finally block is also `O(n + log c) \in 
O(constant)` where `n` is the total number of tasks and `c` is the number of 
cancelled tasks. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22912: [SPARK-25901] Use only one thread in BarrierTaskC...

2018-10-31 Thread yogeshg
GitHub user yogeshg opened a pull request:

https://github.com/apache/spark/pull/22912

[SPARK-25901] Use only one thread in BarrierTaskContext companion object 

## What changes were proposed in this pull request?

Now we use only one `timer` (and thus a backing thread) in 
`BarrierTaskContext` companion object, and the objects can add `timerTasks` to 
that `timer`.

## How was this patch tested?

This was tested manually by generating logs and seeing that they look the 
same as ones before, namely, that is, a partition waiting on another partition 
for 5seconds generates 4-5 log messages when the frequency of logging is set to 
1second.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogeshg/spark thread

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22912.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22912


commit 63724e43de39f864538538b0c1bd11f0ae06fdf8
Author: Yogesh Garg <1059168+yogeshg@...>
Date:   2018-10-30T17:38:25Z

move timer task to broader scope and use a currentBarrierStartTime

commit 52063971b3879457db02365ed4bd8495161ce111
Author: Yogesh Garg <1059168+yogeshg@...>
Date:   2018-10-31T06:00:36Z

send timer to the companion object




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff test Pyth...

2018-04-05 Thread yogeshg
Github user yogeshg commented on the issue:

https://github.com/apache/spark/pull/20904
  
lgtm, I'll defer to @jkbradley 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-05 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179639255
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -134,6 +134,63 @@ def corr(dataset, column, method="pearson"):
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a
+continuous distribution. By comparing the largest difference between 
the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+:param dataset:
+  a dataset or a dataframe containing the sample of data to test.
+:param sampleCol:
+  Name of sample column in dataset, of any numerical type.
+:param distName:
+  a `string` name for a theoretical distribution, currently only 
support "norm".
+:param params:
+  a list of `Double` values specifying the parameters to be used for 
the theoretical
+  distribution
+:return:
+  A dataframe that contains the Kolmogorov-Smirnov test result for the 
input sampled data.
+  This DataFrame will contain a single Row with the following fields:
+  - `pValue: Double`
+  - `statistic: Double`
+
+>>> from pyspark.ml.stat import KolmogorovSmirnovTest
+>>> dataset = [[-1.0], [0.0], [1.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
0.0, 1.0).collect()[0]
+>>> round(ksResult.pValue, 3)
+1.0
+>>> round(ksResult.statistic, 3)
+0.175
+>>> dataset = [[2.0], [3.0], [4.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
3.0, 1.0).collect()[0]
+>>> round(ksResult.pValue, 3)
+1.0
+>>> round(ksResult.statistic, 3)
+0.175
+
+.. versionadded:: 2.4.0
+
+"""
+@staticmethod
+@since("2.4.0")
+def test(dataset, sampleCol, distName, *params):
+"""
+Perform a Kolmogorov-Smirnov test using dataset.
+"""
+sc = SparkContext._active_spark_context
+javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest
+dataset = _py2java(sc, dataset)
+params = [float(param) for param in params]
+return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName,
--- End diff --

thanks for checking this out! current usage sounds fair!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-04 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179278641
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -134,6 +134,63 @@ def corr(dataset, column, method="pearson"):
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a
+continuous distribution. By comparing the largest difference between 
the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+:param dataset:
+  a dataset or a dataframe containing the sample of data to test.
+:param sampleCol:
+  Name of sample column in dataset, of any numerical type.
+:param distName:
+  a `string` name for a theoretical distribution, currently only 
support "norm".
+:param params:
+  a list of `Double` values specifying the parameters to be used for 
the theoretical
+  distribution
+:return:
+  A dataframe that contains the Kolmogorov-Smirnov test result for the 
input sampled data.
+  This DataFrame will contain a single Row with the following fields:
+  - `pValue: Double`
+  - `statistic: Double`
+
+>>> from pyspark.ml.stat import KolmogorovSmirnovTest
+>>> dataset = [[-1.0], [0.0], [1.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
0.0, 1.0).collect()[0]
+>>> round(ksResult.pValue, 3)
+1.0
+>>> round(ksResult.statistic, 3)
+0.175
+>>> dataset = [[2.0], [3.0], [4.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
3.0, 1.0).collect()[0]
+>>> round(ksResult.pValue, 3)
+1.0
+>>> round(ksResult.statistic, 3)
+0.175
+
+.. versionadded:: 2.4.0
+
+"""
+@staticmethod
+@since("2.4.0")
+def test(dataset, sampleCol, distName, *params):
+"""
+Perform a Kolmogorov-Smirnov test using dataset.
+"""
+sc = SparkContext._active_spark_context
+javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest
+dataset = _py2java(sc, dataset)
+params = [float(param) for param in params]
+return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName,
--- End diff --

do we need to do `_py2java(sc, *)` for `sampleCol` and/or `distName` ? I 
noticed that in `ChiSquaredTest` but I'm not sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-04 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179267921
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -134,6 +134,63 @@ def corr(dataset, column, method="pearson"):
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a
--- End diff --

We can add a one line description and then a paragraph full of description, 
this way we'll get 
[docs](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.stat.package)
 consistent with the rest of the tests in the package.

```scala
/**
...
 * Conduct two-sided Kolmogorov Smirnov (KS) test for data sampled from a 
continuous distribution.
 * 
 * By comparing the largest difference between the empirical cumulative
...
 */
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...

2018-04-04 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20904#discussion_r179283700
  
--- Diff: python/pyspark/ml/stat.py ---
@@ -134,6 +134,63 @@ def corr(dataset, column, method="pearson"):
 return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+"""
+.. note:: Experimental
+
+Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a
+continuous distribution. By comparing the largest difference between 
the empirical cumulative
+distribution of the sample data and the theoretical distribution we 
can provide a test for the
+the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+:param dataset:
+  a dataset or a dataframe containing the sample of data to test.
+:param sampleCol:
+  Name of sample column in dataset, of any numerical type.
+:param distName:
+  a `string` name for a theoretical distribution, currently only 
support "norm".
+:param params:
+  a list of `Double` values specifying the parameters to be used for 
the theoretical
+  distribution
+:return:
+  A dataframe that contains the Kolmogorov-Smirnov test result for the 
input sampled data.
+  This DataFrame will contain a single Row with the following fields:
+  - `pValue: Double`
+  - `statistic: Double`
+
+>>> from pyspark.ml.stat import KolmogorovSmirnovTest
+>>> dataset = [[-1.0], [0.0], [1.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
0.0, 1.0).collect()[0]
+>>> round(ksResult.pValue, 3)
+1.0
+>>> round(ksResult.statistic, 3)
+0.175
+>>> dataset = [[2.0], [3.0], [4.0]]
+>>> dataset = spark.createDataFrame(dataset, ['sample'])
+>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
3.0, 1.0).collect()[0]
+>>> round(ksResult.pValue, 3)
+1.0
+>>> round(ksResult.statistic, 3)
+0.175
+
+.. versionadded:: 2.4.0
+
+"""
+@staticmethod
+@since("2.4.0")
+def test(dataset, sampleCol, distName, *params):
+"""
+Perform a Kolmogorov-Smirnov test using dataset.
+"""
+sc = SparkContext._active_spark_context
+javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest
+dataset = _py2java(sc, dataset)
+params = [float(param) for param in params]
+return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName,
--- End diff --

Looks like we should be able to use `pyspark.ml.common.callJavaFunc`
```python
from pyspark.ml.common import _java2py, _py2java, callJavaFunc
...

return callJavaFunc(sc, javaTestObj, dataset, sampleCol, distName, *params)
```

but I am not fully sure if `_jvm().PythonUtils.toSeq()` method will still 
be required. 🤞 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20970: [SPARK-23562][ML] Forward RFormula handleInvalid ...

2018-04-03 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20970#discussion_r178942791
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala ---
@@ -592,4 +593,26 @@ class RFormulaSuite extends MLTest with 
DefaultReadWriteTest {
 assert(features.toArray === a +: b.toArray)
 }
   }
+
+  test("SPARK-23562 RFormula handleInvalid should handle invalid values in 
non-string columns.") {
+val d1 = Seq(
+  (1001L, "a"),
+  (1002L, "b")).toDF("id1", "c1")
+val d2 = Seq[(java.lang.Long, String)](
+  (20001L, "x"),
+  (20002L, "y"),
+  (null, null)).toDF("id2", "c2")
+val dataset = d1.crossJoin(d2)
+
+def get_output(mode: String): DataFrame = {
+  val formula = new RFormula().setFormula("c1 ~ 
id2").setHandleInvalid(mode)
+  formula.fit(dataset).transform(dataset).select("features", "label")
+}
+
+intercept[SparkException](get_output("error").collect())
+  .getMessage contains "Encountered null while assembling a row"
--- End diff --

Sure, that makes sense. Yea, I did mean to assert that :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20970: [SPARK-23562][ML] Forward RFormula handleInvalid Param t...

2018-04-03 Thread yogeshg
Github user yogeshg commented on the issue:

https://github.com/apache/spark/pull/20970
  
- [ ] send all PRs against subtasks, rather than against the parent task
- [ ] avoid using infix notation for testing `contains`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20970: [SPARK-23562][ML] Forward RFormula handleInvalid ...

2018-04-03 Thread yogeshg
GitHub user yogeshg opened a pull request:

https://github.com/apache/spark/pull/20970

[SPARK-23562][ML] Forward RFormula handleInvalid Param to VectorAssembler 
to handle invalid values in non-string columns

## What changes were proposed in this pull request?

`handleInvalid` Param was forwarded to the VectorAssembler used by RFormula.

## How was this patch tested?

added a test and ran all tests for RFormula and VectorAssembler


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogeshg/spark spark_23562

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20970.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20970


commit 555be7900312cc594dae6f86140bd0f2fbf7cdd9
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-20T01:27:13Z

add test case

commit b53cbfaa2323da5c46059a6e3439ded84cb23eeb
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-04-03T18:07:15Z

pass handleInvalid param to VectorAssembler




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178636550
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+  /**
+   * Infers lengths of vector columns from the first row of the dataset
+   * @param dataset the dataset
+   * @param columns name of vector columns whose lengths need to be 
inferred
+   * @return map of column names to lengths
+   */
+  private[feature] def getVectorLengthsFromFirstRow(
+  dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = {
+try {
+  val first_row = dataset.toDF().select(columns.map(col): _*).first()
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+s"""Encountered null value while inferring lengths from the first 
row. Consider using
+   |VectorSizeHint to add metadata for columns: 
${columns.mkString("[", ", ", "]")}.
+   |""".stripMargin.replaceAll("\n", " ") + e.toString)
--- End diff --

"\n" should be replaced with " " and then a message. Unless I misunderstand 
something.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-04-02 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r178605922
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+   * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+   * from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.4.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"""
+| Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+| invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+| output). Column lengths are taken from the size of ML Attribute 
Group, which can be set using
+| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column 
lengths can also be inferred
+| from first rows of the data since it is safe to do so but only in 
case of 'error' or 'skip'.
+| """.stripMargin.replaceAll("\n", " "),
+ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val vectorCols = $(inputCols).toSeq.filter { c =>
+  schema(c).dataType match {
+case _: VectorUDT => true
+case _ => false
+  }
+}
+val vectorColsLengths = VectorAssembler.getLengths(dataset, 
vectorCols, $(handleInvalid))
+
+val featureAttributesMap = $(inputCols).toSeq.map { c =>
--- End diff --

We need the map to find out the length of vectors, unless there's a way to 
do this in one mapping way, I think it might be better than to call first a 
`map` and then a `flatMap`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176285294
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
--- End diff --

also, we just deal with nulls here. NaNs and incorrect length vectors are 
transmitted transparently. Do we need to test for those?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176280827
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
+
intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect())
+
+// numeric column is all null
--- End diff --

was testing extraction of metadata for numeric column (is always 1). Not 
relevant in new framework.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176267223
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
--- End diff --

it fails because vector size hint is not given, adding a section with 
VectorSizeHInts


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176265864
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
--- End diff --

updated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176266756
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

thanks, good idea! this helped me in catching the `drop.na()` bug that 
might drop everything


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176266213
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

to allow nulls in the column :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176245770
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
--- End diff --

Thanks! We do throw some descriptive error 
[here](https://github.com/apache/spark/pull/20829/files#diff-9c84e1d27f25714e256cb482069359cfR193),
 added more description to it and made assertions in test on those messages.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176228007
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
--- End diff --

Behavior of options already included, explanation of column length included 
here, run time information included in the VectorAssembler class's 
documentation. Thanks for the suggestion, this is super important!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-21 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r176220684
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -37,24 +37,26 @@ class VectorAssemblerSuite
 
   test("assemble") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty))
-assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0)))
+assert(assemble(Seq(1), true)(0.0) === Vectors.sparse(1, Array.empty, 
Array.empty))
+assert(assemble(Seq(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, 
Array(1), Array(1.0)))
 val dv = Vectors.dense(2.0, 0.0)
-assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), 
Array(2.0, 1.0)))
+assert(assemble(Seq(1, 2, 1), true)(0.0, dv, 1.0) ===
+  Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0)))
 val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0))
-assert(assemble(0.0, dv, 1.0, sv) ===
+assert(assemble(Seq(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) ===
   Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0)))
-for (v <- Seq(1, "a", null)) {
-  intercept[SparkException](assemble(v))
-  intercept[SparkException](assemble(1.0, v))
+for (v <- Seq(1, "a")) {
+  intercept[SparkException](assemble(Seq(1), true)(v))
+  intercept[SparkException](assemble(Seq(1, 1), true)(1.0, v))
 }
   }
 
   test("assemble should compress vectors") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0))
+val v1 = assemble(Seq(1, 1, 1, 4), true)(0.0, 0.0, 0.0, 
Vectors.dense(4.0))
--- End diff --

that's a typo, Thanks for pointing it out! that number is not used in case 
we do not have nulls, which is why the test passes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #6452: [SPARK-7198] [MLLIB] VectorAssembler should output...

2018-03-19 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/6452#discussion_r175548228
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -46,19 +47,59 @@ class VectorAssembler(override val uid: String)
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
   override def transform(dataset: DataFrame): DataFrame = {
+// Schema transformation.
+val schema = dataset.schema
+lazy val first = dataset.first()
+val attrs = $(inputCols).flatMap { c =>
+  val field = schema(c)
+  val index = schema.fieldIndex(c)
+  field.dataType match {
+case DoubleType =>
+  val attr = Attribute.fromStructField(field)
+  // If the input column doesn't have ML attribute, assume numeric.
+  if (attr == UnresolvedAttribute) {
+Some(NumericAttribute.defaultAttr.withName(c))
+  } else {
+Some(attr.withName(c))
+  }
+case _: NumericType | BooleanType =>
--- End diff --

I had a quick question regarding this, why do we not consider a 3.2 case 
where we have Scalar type with ML attributes? is it because there's no such 
thing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-16 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175153265
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("1.6.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
+   * invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN in the
+   * output).
+   * Default: "error"
+   * @group param
+   */
+  @Since("1.6.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"Hhow to handle invalid data (NULL values). Options are 'skip' (filter 
out rows with " +
+  "invalid data), 'error' (throw an error), or 'keep' (return relevant 
number of NaN " +
+  "in the * output).", 
ParamValidators.inArray(VectorAssembler.supportedHandleInvalids))
+
+  setDefault(handleInvalid, VectorAssembler.ERROR_INVALID)
+
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
 // Schema transformation.
 val schema = dataset.schema
-lazy val first = dataset.toDF.first()
-val attrs = $(inputCols).flatMap { c =>
+
+val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map 
{ c =>
   val field = schema(c)
-  val index = schema.fieldIndex(c)
   field.dataType match {
-case DoubleType =>
-  val attr = Attribute.fromStructField(field)
-  // If the input column doesn't have ML attribute, assume numeric.
-  if (attr == UnresolvedAttribute) {
-Some(NumericAttribute.defaultAttr.withName(c))
-  } else {
-Some(attr.withName(c))
-  }
-case _: NumericType | BooleanType =>
-  // If the input column type is a compatible scalar type, assume 
numeric.
-  Some(NumericAttribute.defaultAttr.withName(c))
 case _: VectorUDT =>
-  val group = AttributeGroup.fromStructField(field)
-  if (group.attributes.isDefined) {
-// If attributes are defined, copy them with updated names.
-group.attributes.get.zipWithIndex.map { case (attr, i) =>
+  val attributeGroup = AttributeGroup.fromStructField(field)
+  var length = attributeGroup.size
+  val isMissingNumAttrs = -1 == length
+  if (isMissingNumAttrs && dataset.isStreaming) {
+// this condition is checked for every column, but should be 
cheap
+throw new RuntimeException(
+  s"""
+ |VectorAssembler cannot dynamically determine the size of 
vectors for streaming
+ |data. Consider applying VectorSizeHint to ${c} so that 
this transformer can be
+ |used to transform streaming inputs.
+   """.stripMargin.replaceAll("\n", " "))
+  }
+  if (isMissingNumAttrs) {
+val column = dataset.select(c).na.drop()
--- End diff --

Good catch! That name was bothering me too :P
@MrBago  and I are thinking of another way to do this more efficiently.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-16 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175152022
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -85,18 +120,34 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   } else {
 // Otherwise, treat all attributes as numeric. If we cannot 
get the number of attributes
 // from metadata, check the first row.
-val numAttrs = 
group.numAttributes.getOrElse(first.getAs[Vector](index).size)
-Array.tabulate(numAttrs)(i => 
NumericAttribute.defaultAttr.withName(c + "_" + i))
+(0 until length).map { i => 
NumericAttribute.defaultAttr.withName(c + "_" + i) }
+  }
+case DoubleType =>
+  val attribute = Attribute.fromStructField(field)
+  attribute match {
+case UnresolvedAttribute =>
+  Seq(NumericAttribute.defaultAttr.withName(c))
+case _ =>
+  Seq(attribute.withName(c))
   }
+case _ : NumericType | BooleanType =>
+  // If the input column type is a compatible scalar type, assume 
numeric.
+  Seq(NumericAttribute.defaultAttr.withName(c))
 case otherType =>
   throw new SparkException(s"VectorAssembler does not support the 
$otherType type")
   }
 }
-val metadata = new AttributeGroup($(outputCol), attrs).toMetadata()
-
+val featureAttributes = featureAttributesMap.flatten[Attribute]
+val lengths = featureAttributesMap.map(a => a.length)
+val metadata = new AttributeGroup($(outputCol), 
featureAttributes.toArray).toMetadata()
+val (filteredDataset, keepInvalid) = $(handleInvalid) match {
+  case StringIndexer.SKIP_INVALID => (dataset.na.drop("any", 
$(inputCols)), false)
--- End diff --

Ah, good point! Although I do think that keeping "any" might make it easier 
to read, but that may not necessarily hold for experienced people :P



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler

2018-03-15 Thread yogeshg
Github user yogeshg commented on the issue:

https://github.com/apache/spark/pull/20829
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler

2018-03-15 Thread yogeshg
Github user yogeshg commented on the issue:

https://github.com/apache/spark/pull/20829
  
I fixed code paths that failed tests, waiting for @SparkQA . Offline talk 
with @MrBago suggests that we can perhaps decrease the number of maps in 
`transform` method. Looking into that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-15 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r174941546
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -234,7 +234,7 @@ class StringIndexerModel (
 val metadata = NominalAttribute.defaultAttr
   .withName($(outputCol)).withValues(filteredLabels).toMetadata()
 // If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = getHandleInvalid match {
--- End diff --

Thanks for picking this out! I changed this because I was matching on 
`$(handleInvalid)` in VectorAssembler and that seems to be the recommended way 
of doing this. Should I include this in the current PR and add a note or open a 
separate PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690] [ML] Add handleinvalid to VectorAss...

2018-03-14 Thread yogeshg
GitHub user yogeshg opened a pull request:

https://github.com/apache/spark/pull/20829

[SPARK-23690] [ML] Add handleinvalid to VectorAssembler

## What changes were proposed in this pull request?

Introduce `handleInvalid` parameter in `VectorAssembler` that can take in 
`"keep", "skip", "error"` options. "error" throws an error on seeing a row 
containing a `null`, "skip" filters out all such rows, and "keep" adds relevant 
number of NaN. "keep" figures out an example to find out what this number of 
NaN s should be added and throws an error when no such number could be found.

## How was this patch tested?

Unit tests are added to check the behavior of `assemble` on specific rows 
and the transformer is called on `DataFrame`s of different configurations to 
test different corner cases.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogeshg/spark rformula_handleinvalid

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20829.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20829


commit 17883798ee406670e52497a32b6a6f55f3e8fbc4
Author: Bago Amirbekian <bago@...>
Date:   2018-03-14T22:42:27Z

Better error for streaming dataframes, ensure non-null Vectors in first.

commit c34332d656849ff8d2b9fbfe752076d8a37cc430
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-09T01:29:18Z

add NaN for null column

commit f2f763dc54401f6b8009cd99e42b1eb2891a1f8c
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-14T22:13:04Z

get lengths with a map

commit 272a806cee85f1028b29da418c5ac8ca27a99cb6
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-14T22:43:12Z

wip

commit dc99db851b46b7112e04c86c9f3fccc197aa97d2
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-14T22:45:11Z

wip

commit 61fbcc42891e834b9914def6dadbe1ff24725998
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-14T22:45:24Z

wip

commit 08b8c048afe2a806bbf8f2ce5017a16ba08997e8
Author: Bago Amirbekian <bago@...>
Date:   2018-03-14T22:50:05Z

Merge branch 'rformula_handleinvalid' into vectorAssemblerStuff

commit 8c98d368739539d1113096c4ae8b81bda27eb950
Author: Bago Amirbekian <bago@...>
Date:   2018-03-14T23:55:11Z

Merge fixes.

commit cb0faba9dbc860cf5550bb59904a02c17ced8a00
Author: Yogesh Garg <1059168+yogeshg@...>
Date:   2018-03-15T00:01:50Z

Merge pull request #2 from MrBago/vectorAssemblerStuff

Vector assembler stuff

commit 3c3532c624b796f723720747d4e5453a1316b329
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-15T00:45:20Z

fix issues with this implementation

commit d29228ceed9479841cf2b8e4994388c6b628f0c1
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-15T02:02:53Z

fix bugs; add tests

commit c0c0e3df92dc787d9ce9ca632353d86c6457420d
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-15T02:12:59Z

clean




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20724: [SPARK-18630][PYTHON][ML] Move del method from Ja...

2018-03-05 Thread yogeshg
Github user yogeshg commented on a diff in the pull request:

https://github.com/apache/spark/pull/20724#discussion_r172273476
  
--- Diff: python/pyspark/ml/tests.py ---
@@ -173,6 +173,45 @@ class MockModel(MockTransformer, Model, HasFake):
 pass
 
 
+class JavaWrapperMemoryTests(SparkSessionTestCase):
+
+def test_java_object_gets_detached(self):
+df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ (0.0, 2.0, Vectors.sparse(1, [], 
[]))],
+["label", "weight", "features"])
+lr = LinearRegression(maxIter=1, regParam=0.0, solver="normal", 
weightCol="weight",
+  fitIntercept=False)
+
+model = lr.fit(df)
+summary = model.summary
+
+self.assertIsInstance(model, JavaWrapper)
+self.assertIsInstance(summary, JavaWrapper)
+self.assertIsInstance(model, JavaParams)
+self.assertNotIsInstance(summary, JavaParams)
+
+error_no_object = 'Target Object ID does not exist for this 
gateway'
+
+self.assertIn("LinearRegression_", model._java_obj.toString())
+self.assertIn("LinearRegressionTrainingSummary", 
summary._java_obj.toString())
+
+model.__del__()
+
+with self.assertRaisesRegexp(py4j.protocol.Py4JError, 
error_no_object):
+model._java_obj.toString()
+self.assertIn("LinearRegressionTrainingSummary", 
summary._java_obj.toString())
+
+try:
+summary.__del__()
+except:
--- End diff --

`__del__` is not a method of the object class. This test throws an error 
with earlier code (when `__del__` is in `JavaParams`) because the 
`LinearRegressionSummary` class did not inherit the del method from any of its 
ancestors (`JavaWrapper` and `object`). After moving the del method to 
`JavaWrapper` this line executes. If I remove the try method, then we are 
testing the condition that "`__del__` method exists && `__del__` method 
releases memory".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20724: [SPARK-18630][PYTHON][ML] Move del method from Ja...

2018-03-02 Thread yogeshg
GitHub user yogeshg opened a pull request:

https://github.com/apache/spark/pull/20724

[SPARK-18630][PYTHON][ML] Move del method from JavaParams to JavaWrapper; 
add tests

## What changes were proposed in this pull request?

Move del method from JavaParams to JavaWrapper; add tests

## How was this patch tested?

I ran pyspark tests  agains `pyspark-ml` module
`./python/run-tests --python-executables=$(which python) 
--modules=pyspark-ml`


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogeshg/spark java_wrapper_memory

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20724.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20724


commit 50acecc3d12778f1f30ca636b6e83163f1fc775a
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-03T00:00:40Z

add test case for JavaWrapper that displays memory leak for JavaWrapper but 
not JavaParams

commit d36c1a10cd318d9ddeb2717737248c974a2349f1
Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com>
Date:   2018-03-03T00:01:19Z

send the delete method from JavaParams to JavaWrapper




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org