[GitHub] spark issue #22912: [SPARK-25901][CORE] Use only one thread in BarrierTaskCo...
Github user yogeshg commented on the issue: https://github.com/apache/spark/pull/22912 In an offline discussion with @MrBago , we noted that there's at most as many (non-cancelled) `timerTasks` on the `timer` as there are slots. So, one thread for managing logging is probably fine, in fact if anything, we should also think about how we can just use the main thread. Also, this means that the `timer.purge()` call in the finally block is also `O(n + log c) \in O(constant)` where `n` is the total number of tasks and `c` is the number of cancelled tasks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22912: [SPARK-25901] Use only one thread in BarrierTaskC...
GitHub user yogeshg opened a pull request: https://github.com/apache/spark/pull/22912 [SPARK-25901] Use only one thread in BarrierTaskContext companion object ## What changes were proposed in this pull request? Now we use only one `timer` (and thus a backing thread) in `BarrierTaskContext` companion object, and the objects can add `timerTasks` to that `timer`. ## How was this patch tested? This was tested manually by generating logs and seeing that they look the same as ones before, namely, that is, a partition waiting on another partition for 5seconds generates 4-5 log messages when the frequency of logging is set to 1second. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogeshg/spark thread Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22912.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22912 commit 63724e43de39f864538538b0c1bd11f0ae06fdf8 Author: Yogesh Garg <1059168+yogeshg@...> Date: 2018-10-30T17:38:25Z move timer task to broader scope and use a currentBarrierStartTime commit 52063971b3879457db02365ed4bd8495161ce111 Author: Yogesh Garg <1059168+yogeshg@...> Date: 2018-10-31T06:00:36Z send timer to the companion object --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff test Pyth...
Github user yogeshg commented on the issue: https://github.com/apache/spark/pull/20904 lgtm, I'll defer to @jkbradley --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179639255 --- Diff: python/pyspark/ml/stat.py --- @@ -134,6 +134,63 @@ def corr(dataset, column, method="pearson"): return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a +continuous distribution. By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +:param dataset: + a dataset or a dataframe containing the sample of data to test. +:param sampleCol: + Name of sample column in dataset, of any numerical type. +:param distName: + a `string` name for a theoretical distribution, currently only support "norm". +:param params: + a list of `Double` values specifying the parameters to be used for the theoretical + distribution +:return: + A dataframe that contains the Kolmogorov-Smirnov test result for the input sampled data. + This DataFrame will contain a single Row with the following fields: + - `pValue: Double` + - `statistic: Double` + +>>> from pyspark.ml.stat import KolmogorovSmirnovTest +>>> dataset = [[-1.0], [0.0], [1.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).collect()[0] +>>> round(ksResult.pValue, 3) +1.0 +>>> round(ksResult.statistic, 3) +0.175 +>>> dataset = [[2.0], [3.0], [4.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 3.0, 1.0).collect()[0] +>>> round(ksResult.pValue, 3) +1.0 +>>> round(ksResult.statistic, 3) +0.175 + +.. versionadded:: 2.4.0 + +""" +@staticmethod +@since("2.4.0") +def test(dataset, sampleCol, distName, *params): +""" +Perform a Kolmogorov-Smirnov test using dataset. +""" +sc = SparkContext._active_spark_context +javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest +dataset = _py2java(sc, dataset) +params = [float(param) for param in params] +return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName, --- End diff -- thanks for checking this out! current usage sounds fair! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179278641 --- Diff: python/pyspark/ml/stat.py --- @@ -134,6 +134,63 @@ def corr(dataset, column, method="pearson"): return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a +continuous distribution. By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +:param dataset: + a dataset or a dataframe containing the sample of data to test. +:param sampleCol: + Name of sample column in dataset, of any numerical type. +:param distName: + a `string` name for a theoretical distribution, currently only support "norm". +:param params: + a list of `Double` values specifying the parameters to be used for the theoretical + distribution +:return: + A dataframe that contains the Kolmogorov-Smirnov test result for the input sampled data. + This DataFrame will contain a single Row with the following fields: + - `pValue: Double` + - `statistic: Double` + +>>> from pyspark.ml.stat import KolmogorovSmirnovTest +>>> dataset = [[-1.0], [0.0], [1.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).collect()[0] +>>> round(ksResult.pValue, 3) +1.0 +>>> round(ksResult.statistic, 3) +0.175 +>>> dataset = [[2.0], [3.0], [4.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 3.0, 1.0).collect()[0] +>>> round(ksResult.pValue, 3) +1.0 +>>> round(ksResult.statistic, 3) +0.175 + +.. versionadded:: 2.4.0 + +""" +@staticmethod +@since("2.4.0") +def test(dataset, sampleCol, distName, *params): +""" +Perform a Kolmogorov-Smirnov test using dataset. +""" +sc = SparkContext._active_spark_context +javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest +dataset = _py2java(sc, dataset) +params = [float(param) for param in params] +return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName, --- End diff -- do we need to do `_py2java(sc, *)` for `sampleCol` and/or `distName` ? I noticed that in `ChiSquaredTest` but I'm not sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179267921 --- Diff: python/pyspark/ml/stat.py --- @@ -134,6 +134,63 @@ def corr(dataset, column, method="pearson"): return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a --- End diff -- We can add a one line description and then a paragraph full of description, this way we'll get [docs](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.stat.package) consistent with the rest of the tests in the package. ```scala /** ... * Conduct two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous distribution. * * By comparing the largest difference between the empirical cumulative ... */ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r179283700 --- Diff: python/pyspark/ml/stat.py --- @@ -134,6 +134,63 @@ def corr(dataset, column, method="pearson"): return _java2py(sc, javaCorrObj.corr(*args)) +class KolmogorovSmirnovTest(object): +""" +.. note:: Experimental + +Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a +continuous distribution. By comparing the largest difference between the empirical cumulative +distribution of the sample data and the theoretical distribution we can provide a test for the +the null hypothesis that the sample data comes from that theoretical distribution. + +:param dataset: + a dataset or a dataframe containing the sample of data to test. +:param sampleCol: + Name of sample column in dataset, of any numerical type. +:param distName: + a `string` name for a theoretical distribution, currently only support "norm". +:param params: + a list of `Double` values specifying the parameters to be used for the theoretical + distribution +:return: + A dataframe that contains the Kolmogorov-Smirnov test result for the input sampled data. + This DataFrame will contain a single Row with the following fields: + - `pValue: Double` + - `statistic: Double` + +>>> from pyspark.ml.stat import KolmogorovSmirnovTest +>>> dataset = [[-1.0], [0.0], [1.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).collect()[0] +>>> round(ksResult.pValue, 3) +1.0 +>>> round(ksResult.statistic, 3) +0.175 +>>> dataset = [[2.0], [3.0], [4.0]] +>>> dataset = spark.createDataFrame(dataset, ['sample']) +>>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 3.0, 1.0).collect()[0] +>>> round(ksResult.pValue, 3) +1.0 +>>> round(ksResult.statistic, 3) +0.175 + +.. versionadded:: 2.4.0 + +""" +@staticmethod +@since("2.4.0") +def test(dataset, sampleCol, distName, *params): +""" +Perform a Kolmogorov-Smirnov test using dataset. +""" +sc = SparkContext._active_spark_context +javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest +dataset = _py2java(sc, dataset) +params = [float(param) for param in params] +return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName, --- End diff -- Looks like we should be able to use `pyspark.ml.common.callJavaFunc` ```python from pyspark.ml.common import _java2py, _py2java, callJavaFunc ... return callJavaFunc(sc, javaTestObj, dataset, sampleCol, distName, *params) ``` but I am not fully sure if `_jvm().PythonUtils.toSeq()` method will still be required. ð¤ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20970: [SPARK-23562][ML] Forward RFormula handleInvalid ...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20970#discussion_r178942791 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala --- @@ -592,4 +593,26 @@ class RFormulaSuite extends MLTest with DefaultReadWriteTest { assert(features.toArray === a +: b.toArray) } } + + test("SPARK-23562 RFormula handleInvalid should handle invalid values in non-string columns.") { +val d1 = Seq( + (1001L, "a"), + (1002L, "b")).toDF("id1", "c1") +val d2 = Seq[(java.lang.Long, String)]( + (20001L, "x"), + (20002L, "y"), + (null, null)).toDF("id2", "c2") +val dataset = d1.crossJoin(d2) + +def get_output(mode: String): DataFrame = { + val formula = new RFormula().setFormula("c1 ~ id2").setHandleInvalid(mode) + formula.fit(dataset).transform(dataset).select("features", "label") +} + +intercept[SparkException](get_output("error").collect()) + .getMessage contains "Encountered null while assembling a row" --- End diff -- Sure, that makes sense. Yea, I did mean to assert that :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20970: [SPARK-23562][ML] Forward RFormula handleInvalid Param t...
Github user yogeshg commented on the issue: https://github.com/apache/spark/pull/20970 - [ ] send all PRs against subtasks, rather than against the parent task - [ ] avoid using infix notation for testing `contains` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20970: [SPARK-23562][ML] Forward RFormula handleInvalid ...
GitHub user yogeshg opened a pull request: https://github.com/apache/spark/pull/20970 [SPARK-23562][ML] Forward RFormula handleInvalid Param to VectorAssembler to handle invalid values in non-string columns ## What changes were proposed in this pull request? `handleInvalid` Param was forwarded to the VectorAssembler used by RFormula. ## How was this patch tested? added a test and ran all tests for RFormula and VectorAssembler You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogeshg/spark spark_23562 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20970.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20970 commit 555be7900312cc594dae6f86140bd0f2fbf7cdd9 Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-20T01:27:13Z add test case commit b53cbfaa2323da5c46059a6e3439ded84cb23eeb Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-04-03T18:07:15Z pass handleInvalid param to VectorAssembler --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178636550 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +181,106 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + /** + * Infers lengths of vector columns from the first row of the dataset + * @param dataset the dataset + * @param columns name of vector columns whose lengths need to be inferred + * @return map of column names to lengths + */ + private[feature] def getVectorLengthsFromFirstRow( + dataset: Dataset[_], columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF().select(columns.map(col): _*).first() + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +s"""Encountered null value while inferring lengths from the first row. Consider using + |VectorSizeHint to add metadata for columns: ${columns.mkString("[", ", ", "]")}. + |""".stripMargin.replaceAll("\n", " ") + e.toString) --- End diff -- "\n" should be replaced with " " and then a message. Unless I misunderstand something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r178605922 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +55,64 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). Column lengths are taken from the size of ML Attribute Group, which can be set using + * `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred + * from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. + * Default: "error" + * @group param + */ + @Since("2.4.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +""" +| Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with +| invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the +| output). Column lengths are taken from the size of ML Attribute Group, which can be set using +| `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred +| from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'. +| """.stripMargin.replaceAll("\n", " "), +ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val vectorCols = $(inputCols).toSeq.filter { c => + schema(c).dataType match { +case _: VectorUDT => true +case _ => false + } +} +val vectorColsLengths = VectorAssembler.getLengths(dataset, vectorCols, $(handleInvalid)) + +val featureAttributesMap = $(inputCols).toSeq.map { c => --- End diff -- We need the map to find out the length of vectors, unless there's a way to do this in one mapping way, I think it might be better than to call first a `map` and then a `flatMap`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176285294 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with --- End diff -- also, we just deal with nulls here. NaNs and incorrect length vectors are transmitted transparently. Do we need to test for those? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176280827 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) + intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect()) + +// numeric column is all null --- End diff -- was testing extraction of metadata for numeric column (is always 1). Not relevant in new framework. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176267223 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) --- End diff -- it fails because vector size hint is not given, adding a section with VectorSizeHInts --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176265864 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], --- End diff -- updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176266756 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- thanks, good idea! this helped me in catching the `drop.na()` bug that might drop everything --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176266213 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- to allow nulls in the column :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176245770 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { --- End diff -- Thanks! We do throw some descriptive error [here](https://github.com/apache/spark/pull/20829/files#diff-9c84e1d27f25714e256cb482069359cfR193), added more description to it and made assertions in test on those messages. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176228007 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with --- End diff -- Behavior of options already included, explanation of column length included here, run time information included in the VectorAssembler class's documentation. Thanks for the suggestion, this is super important! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r176220684 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -37,24 +37,26 @@ class VectorAssemblerSuite test("assemble") { import org.apache.spark.ml.feature.VectorAssembler.assemble -assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) -assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) +assert(assemble(Seq(1), true)(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) +assert(assemble(Seq(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) val dv = Vectors.dense(2.0, 0.0) -assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) +assert(assemble(Seq(1, 2, 1), true)(0.0, dv, 1.0) === + Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0)) -assert(assemble(0.0, dv, 1.0, sv) === +assert(assemble(Seq(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) === Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0))) -for (v <- Seq(1, "a", null)) { - intercept[SparkException](assemble(v)) - intercept[SparkException](assemble(1.0, v)) +for (v <- Seq(1, "a")) { + intercept[SparkException](assemble(Seq(1), true)(v)) + intercept[SparkException](assemble(Seq(1, 1), true)(1.0, v)) } } test("assemble should compress vectors") { import org.apache.spark.ml.feature.VectorAssembler.assemble -val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0)) +val v1 = assemble(Seq(1, 1, 1, 4), true)(0.0, 0.0, 0.0, Vectors.dense(4.0)) --- End diff -- that's a typo, Thanks for pointing it out! that number is not used in case we do not have nulls, which is why the test passes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #6452: [SPARK-7198] [MLLIB] VectorAssembler should output...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/6452#discussion_r175548228 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -46,19 +47,59 @@ class VectorAssembler(override val uid: String) def setOutputCol(value: String): this.type = set(outputCol, value) override def transform(dataset: DataFrame): DataFrame = { +// Schema transformation. +val schema = dataset.schema +lazy val first = dataset.first() +val attrs = $(inputCols).flatMap { c => + val field = schema(c) + val index = schema.fieldIndex(c) + field.dataType match { +case DoubleType => + val attr = Attribute.fromStructField(field) + // If the input column doesn't have ML attribute, assume numeric. + if (attr == UnresolvedAttribute) { +Some(NumericAttribute.defaultAttr.withName(c)) + } else { +Some(attr.withName(c)) + } +case _: NumericType | BooleanType => --- End diff -- I had a quick question regarding this, why do we not consider a 3.2 case where we have Scalar type with ML attributes? is it because there's no such thing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175153265 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +51,65 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("1.6.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with + * invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the + * output). + * Default: "error" + * @group param + */ + @Since("1.6.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +"Hhow to handle invalid data (NULL values). Options are 'skip' (filter out rows with " + + "invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN " + + "in the * output).", ParamValidators.inArray(VectorAssembler.supportedHandleInvalids)) + + setDefault(handleInvalid, VectorAssembler.ERROR_INVALID) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // Schema transformation. val schema = dataset.schema -lazy val first = dataset.toDF.first() -val attrs = $(inputCols).flatMap { c => + +val featureAttributesMap: Seq[Seq[Attribute]] = $(inputCols).toSeq.map { c => val field = schema(c) - val index = schema.fieldIndex(c) field.dataType match { -case DoubleType => - val attr = Attribute.fromStructField(field) - // If the input column doesn't have ML attribute, assume numeric. - if (attr == UnresolvedAttribute) { -Some(NumericAttribute.defaultAttr.withName(c)) - } else { -Some(attr.withName(c)) - } -case _: NumericType | BooleanType => - // If the input column type is a compatible scalar type, assume numeric. - Some(NumericAttribute.defaultAttr.withName(c)) case _: VectorUDT => - val group = AttributeGroup.fromStructField(field) - if (group.attributes.isDefined) { -// If attributes are defined, copy them with updated names. -group.attributes.get.zipWithIndex.map { case (attr, i) => + val attributeGroup = AttributeGroup.fromStructField(field) + var length = attributeGroup.size + val isMissingNumAttrs = -1 == length + if (isMissingNumAttrs && dataset.isStreaming) { +// this condition is checked for every column, but should be cheap +throw new RuntimeException( + s""" + |VectorAssembler cannot dynamically determine the size of vectors for streaming + |data. Consider applying VectorSizeHint to ${c} so that this transformer can be + |used to transform streaming inputs. + """.stripMargin.replaceAll("\n", " ")) + } + if (isMissingNumAttrs) { +val column = dataset.select(c).na.drop() --- End diff -- Good catch! That name was bothering me too :P @MrBago and I are thinking of another way to do this more efficiently. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175152022 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -85,18 +120,34 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) } else { // Otherwise, treat all attributes as numeric. If we cannot get the number of attributes // from metadata, check the first row. -val numAttrs = group.numAttributes.getOrElse(first.getAs[Vector](index).size) -Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName(c + "_" + i)) +(0 until length).map { i => NumericAttribute.defaultAttr.withName(c + "_" + i) } + } +case DoubleType => + val attribute = Attribute.fromStructField(field) + attribute match { +case UnresolvedAttribute => + Seq(NumericAttribute.defaultAttr.withName(c)) +case _ => + Seq(attribute.withName(c)) } +case _ : NumericType | BooleanType => + // If the input column type is a compatible scalar type, assume numeric. + Seq(NumericAttribute.defaultAttr.withName(c)) case otherType => throw new SparkException(s"VectorAssembler does not support the $otherType type") } } -val metadata = new AttributeGroup($(outputCol), attrs).toMetadata() - +val featureAttributes = featureAttributesMap.flatten[Attribute] +val lengths = featureAttributesMap.map(a => a.length) +val metadata = new AttributeGroup($(outputCol), featureAttributes.toArray).toMetadata() +val (filteredDataset, keepInvalid) = $(handleInvalid) match { + case StringIndexer.SKIP_INVALID => (dataset.na.drop("any", $(inputCols)), false) --- End diff -- Ah, good point! Although I do think that keeping "any" might make it easier to read, but that may not necessarily hold for experienced people :P --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler
Github user yogeshg commented on the issue: https://github.com/apache/spark/pull/20829 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler
Github user yogeshg commented on the issue: https://github.com/apache/spark/pull/20829 I fixed code paths that failed tests, waiting for @SparkQA . Offline talk with @MrBago suggests that we can perhaps decrease the number of maps in `transform` method. Looking into that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r174941546 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -234,7 +234,7 @@ class StringIndexerModel ( val metadata = NominalAttribute.defaultAttr .withName($(outputCol)).withValues(filteredLabels).toMetadata() // If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = getHandleInvalid match { --- End diff -- Thanks for picking this out! I changed this because I was matching on `$(handleInvalid)` in VectorAssembler and that seems to be the recommended way of doing this. Should I include this in the current PR and add a note or open a separate PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690] [ML] Add handleinvalid to VectorAss...
GitHub user yogeshg opened a pull request: https://github.com/apache/spark/pull/20829 [SPARK-23690] [ML] Add handleinvalid to VectorAssembler ## What changes were proposed in this pull request? Introduce `handleInvalid` parameter in `VectorAssembler` that can take in `"keep", "skip", "error"` options. "error" throws an error on seeing a row containing a `null`, "skip" filters out all such rows, and "keep" adds relevant number of NaN. "keep" figures out an example to find out what this number of NaN s should be added and throws an error when no such number could be found. ## How was this patch tested? Unit tests are added to check the behavior of `assemble` on specific rows and the transformer is called on `DataFrame`s of different configurations to test different corner cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogeshg/spark rformula_handleinvalid Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20829.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20829 commit 17883798ee406670e52497a32b6a6f55f3e8fbc4 Author: Bago Amirbekian <bago@...> Date: 2018-03-14T22:42:27Z Better error for streaming dataframes, ensure non-null Vectors in first. commit c34332d656849ff8d2b9fbfe752076d8a37cc430 Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-09T01:29:18Z add NaN for null column commit f2f763dc54401f6b8009cd99e42b1eb2891a1f8c Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-14T22:13:04Z get lengths with a map commit 272a806cee85f1028b29da418c5ac8ca27a99cb6 Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-14T22:43:12Z wip commit dc99db851b46b7112e04c86c9f3fccc197aa97d2 Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-14T22:45:11Z wip commit 61fbcc42891e834b9914def6dadbe1ff24725998 Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-14T22:45:24Z wip commit 08b8c048afe2a806bbf8f2ce5017a16ba08997e8 Author: Bago Amirbekian <bago@...> Date: 2018-03-14T22:50:05Z Merge branch 'rformula_handleinvalid' into vectorAssemblerStuff commit 8c98d368739539d1113096c4ae8b81bda27eb950 Author: Bago Amirbekian <bago@...> Date: 2018-03-14T23:55:11Z Merge fixes. commit cb0faba9dbc860cf5550bb59904a02c17ced8a00 Author: Yogesh Garg <1059168+yogeshg@...> Date: 2018-03-15T00:01:50Z Merge pull request #2 from MrBago/vectorAssemblerStuff Vector assembler stuff commit 3c3532c624b796f723720747d4e5453a1316b329 Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-15T00:45:20Z fix issues with this implementation commit d29228ceed9479841cf2b8e4994388c6b628f0c1 Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-15T02:02:53Z fix bugs; add tests commit c0c0e3df92dc787d9ce9ca632353d86c6457420d Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-15T02:12:59Z clean --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20724: [SPARK-18630][PYTHON][ML] Move del method from Ja...
Github user yogeshg commented on a diff in the pull request: https://github.com/apache/spark/pull/20724#discussion_r172273476 --- Diff: python/pyspark/ml/tests.py --- @@ -173,6 +173,45 @@ class MockModel(MockTransformer, Model, HasFake): pass +class JavaWrapperMemoryTests(SparkSessionTestCase): + +def test_java_object_gets_detached(self): +df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], []))], +["label", "weight", "features"]) +lr = LinearRegression(maxIter=1, regParam=0.0, solver="normal", weightCol="weight", + fitIntercept=False) + +model = lr.fit(df) +summary = model.summary + +self.assertIsInstance(model, JavaWrapper) +self.assertIsInstance(summary, JavaWrapper) +self.assertIsInstance(model, JavaParams) +self.assertNotIsInstance(summary, JavaParams) + +error_no_object = 'Target Object ID does not exist for this gateway' + +self.assertIn("LinearRegression_", model._java_obj.toString()) +self.assertIn("LinearRegressionTrainingSummary", summary._java_obj.toString()) + +model.__del__() + +with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object): +model._java_obj.toString() +self.assertIn("LinearRegressionTrainingSummary", summary._java_obj.toString()) + +try: +summary.__del__() +except: --- End diff -- `__del__` is not a method of the object class. This test throws an error with earlier code (when `__del__` is in `JavaParams`) because the `LinearRegressionSummary` class did not inherit the del method from any of its ancestors (`JavaWrapper` and `object`). After moving the del method to `JavaWrapper` this line executes. If I remove the try method, then we are testing the condition that "`__del__` method exists && `__del__` method releases memory". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20724: [SPARK-18630][PYTHON][ML] Move del method from Ja...
GitHub user yogeshg opened a pull request: https://github.com/apache/spark/pull/20724 [SPARK-18630][PYTHON][ML] Move del method from JavaParams to JavaWrapper; add tests ## What changes were proposed in this pull request? Move del method from JavaParams to JavaWrapper; add tests ## How was this patch tested? I ran pyspark tests agains `pyspark-ml` module `./python/run-tests --python-executables=$(which python) --modules=pyspark-ml` You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogeshg/spark java_wrapper_memory Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20724.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20724 commit 50acecc3d12778f1f30ca636b6e83163f1fc775a Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-03T00:00:40Z add test case for JavaWrapper that displays memory leak for JavaWrapper but not JavaParams commit d36c1a10cd318d9ddeb2717737248c974a2349f1 Author: Yogesh Garg <yogesh(dot)garg()databricks(dot)com> Date: 2018-03-03T00:01:19Z send the delete method from JavaParams to JavaWrapper --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org