Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d6cdc699e -> ab1b5d921


[SPARK-22797][PYSPARK] Bucketizer support multi-column

## What changes were proposed in this pull request?
Bucketizer support multi-column in the python side

## How was this patch tested?
existing tests and added tests

Author: Zheng RuiFeng <ruife...@foxmail.com>

Closes #19892 from zhengruifeng/20542_py.

(cherry picked from commit c22eaa94e85aaac649566495dcf763a5de3c8d06)
Signed-off-by: Nick Pentreath <ni...@za.ibm.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab1b5d92
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab1b5d92
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab1b5d92

Branch: refs/heads/branch-2.3
Commit: ab1b5d921b395cb7df3a3a2c4a7e5778d98e6f01
Parents: d6cdc69
Author: Zheng RuiFeng <ruife...@foxmail.com>
Authored: Fri Jan 26 12:28:27 2018 +0200
Committer: Nick Pentreath <ni...@za.ibm.com>
Committed: Fri Jan 26 12:28:39 2018 +0200

----------------------------------------------------------------------
 python/pyspark/ml/feature.py        | 105 +++++++++++++++++++++++--------
 python/pyspark/ml/param/__init__.py |  10 +++
 python/pyspark/ml/tests.py          |   9 +++
 3 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab1b5d92/python/pyspark/ml/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index da85ba7..fdc7787 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -317,26 +317,33 @@ class BucketedRandomProjectionLSHModel(LSHModel, 
JavaMLReadable, JavaMLWritable)
 
 
 @inherit_doc
-class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasHandleInvalid,
-                 JavaMLReadable, JavaMLWritable):
-    """
-    Maps a column of continuous features to a column of feature buckets.
-
-    >>> values = [(0.1,), (0.4,), (1.2,), (1.5,), (float("nan"),), 
(float("nan"),)]
-    >>> df = spark.createDataFrame(values, ["values"])
+class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasInputCols, 
HasOutputCols,
+                 HasHandleInvalid, JavaMLReadable, JavaMLWritable):
+    """
+    Maps a column of continuous features to a column of feature buckets. Since 
2.3.0,
+    :py:class:`Bucketizer` can map multiple columns at once by setting the 
:py:attr:`inputCols`
+    parameter. Note that when both the :py:attr:`inputCol` and 
:py:attr:`inputCols` parameters
+    are set, an Exception will be thrown. The :py:attr:`splits` parameter is 
only used for single
+    column usage, and :py:attr:`splitsArray` is for multiple columns.
+
+    >>> values = [(0.1, 0.0), (0.4, 1.0), (1.2, 1.3), (1.5, float("nan")),
+    ...     (float("nan"), 1.0), (float("nan"), 0.0)]
+    >>> df = spark.createDataFrame(values, ["values1", "values2"])
     >>> bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")],
-    ...     inputCol="values", outputCol="buckets")
-    >>> bucketed = bucketizer.setHandleInvalid("keep").transform(df).collect()
-    >>> len(bucketed)
-    6
-    >>> bucketed[0].buckets
-    0.0
-    >>> bucketed[1].buckets
-    0.0
-    >>> bucketed[2].buckets
-    1.0
-    >>> bucketed[3].buckets
-    2.0
+    ...     inputCol="values1", outputCol="buckets")
+    >>> bucketed = 
bucketizer.setHandleInvalid("keep").transform(df.select("values1"))
+    >>> bucketed.show(truncate=False)
+    +-------+-------+
+    |values1|buckets|
+    +-------+-------+
+    |0.1    |0.0    |
+    |0.4    |0.0    |
+    |1.2    |1.0    |
+    |1.5    |2.0    |
+    |NaN    |3.0    |
+    |NaN    |3.0    |
+    +-------+-------+
+    ...
     >>> bucketizer.setParams(outputCol="b").transform(df).head().b
     0.0
     >>> bucketizerPath = temp_path + "/bucketizer"
@@ -347,6 +354,22 @@ class Bucketizer(JavaTransformer, HasInputCol, 
HasOutputCol, HasHandleInvalid,
     >>> bucketed = bucketizer.setHandleInvalid("skip").transform(df).collect()
     >>> len(bucketed)
     4
+    >>> bucketizer2 = Bucketizer(splitsArray=
+    ...     [[-float("inf"), 0.5, 1.4, float("inf")], [-float("inf"), 0.5, 
float("inf")]],
+    ...     inputCols=["values1", "values2"], outputCols=["buckets1", 
"buckets2"])
+    >>> bucketed2 = bucketizer2.setHandleInvalid("keep").transform(df)
+    >>> bucketed2.show(truncate=False)
+    +-------+-------+--------+--------+
+    |values1|values2|buckets1|buckets2|
+    +-------+-------+--------+--------+
+    |0.1    |0.0    |0.0     |0.0     |
+    |0.4    |1.0    |0.0     |1.0     |
+    |1.2    |1.3    |1.0     |1.0     |
+    |1.5    |NaN    |2.0     |2.0     |
+    |NaN    |1.0    |3.0     |1.0     |
+    |NaN    |0.0    |3.0     |0.0     |
+    +-------+-------+--------+--------+
+    ...
 
     .. versionadded:: 1.4.0
     """
@@ -363,14 +386,30 @@ class Bucketizer(JavaTransformer, HasInputCol, 
HasOutputCol, HasHandleInvalid,
 
     handleInvalid = Param(Params._dummy(), "handleInvalid", "how to handle 
invalid entries. " +
                           "Options are 'skip' (filter out rows with invalid 
values), " +
-                          "'error' (throw an error), or 'keep' (keep invalid 
values in a special " +
-                          "additional bucket).",
+                          "'error' (throw an error), or 'keep' (keep invalid 
values in a " +
+                          "special additional bucket). Note that in the 
multiple column " +
+                          "case, the invalid handling is applied to all 
columns. That said " +
+                          "for 'error' it will throw an error if any invalids 
are found in " +
+                          "any column, for 'skip' it will skip rows with any 
invalids in " +
+                          "any columns, etc.",
                           typeConverter=TypeConverters.toString)
 
+    splitsArray = Param(Params._dummy(), "splitsArray", "The array of split 
points for mapping " +
+                        "continuous features into buckets for multiple 
columns. For each input " +
+                        "column, with n+1 splits, there are n buckets. A 
bucket defined by " +
+                        "splits x,y holds values in the range [x,y) except the 
last bucket, " +
+                        "which also includes y. The splits should be of length 
>= 3 and " +
+                        "strictly increasing. Values at -inf, inf must be 
explicitly provided " +
+                        "to cover all Double values; otherwise, values outside 
the splits " +
+                        "specified will be treated as errors.",
+                        typeConverter=TypeConverters.toListListFloat)
+
     @keyword_only
-    def __init__(self, splits=None, inputCol=None, outputCol=None, 
handleInvalid="error"):
+    def __init__(self, splits=None, inputCol=None, outputCol=None, 
handleInvalid="error",
+                 splitsArray=None, inputCols=None, outputCols=None):
         """
-        __init__(self, splits=None, inputCol=None, outputCol=None, 
handleInvalid="error")
+        __init__(self, splits=None, inputCol=None, outputCol=None, 
handleInvalid="error", \
+                 splitsArray=None, inputCols=None, outputCols=None)
         """
         super(Bucketizer, self).__init__()
         self._java_obj = 
self._new_java_obj("org.apache.spark.ml.feature.Bucketizer", self.uid)
@@ -380,9 +419,11 @@ class Bucketizer(JavaTransformer, HasInputCol, 
HasOutputCol, HasHandleInvalid,
 
     @keyword_only
     @since("1.4.0")
-    def setParams(self, splits=None, inputCol=None, outputCol=None, 
handleInvalid="error"):
+    def setParams(self, splits=None, inputCol=None, outputCol=None, 
handleInvalid="error",
+                  splitsArray=None, inputCols=None, outputCols=None):
         """
-        setParams(self, splits=None, inputCol=None, outputCol=None, 
handleInvalid="error")
+        setParams(self, splits=None, inputCol=None, outputCol=None, 
handleInvalid="error", \
+                  splitsArray=None, inputCols=None, outputCols=None)
         Sets params for this Bucketizer.
         """
         kwargs = self._input_kwargs
@@ -402,6 +443,20 @@ class Bucketizer(JavaTransformer, HasInputCol, 
HasOutputCol, HasHandleInvalid,
         """
         return self.getOrDefault(self.splits)
 
+    @since("2.3.0")
+    def setSplitsArray(self, value):
+        """
+        Sets the value of :py:attr:`splitsArray`.
+        """
+        return self._set(splitsArray=value)
+
+    @since("2.3.0")
+    def getSplitsArray(self):
+        """
+        Gets the array of split points or its default value.
+        """
+        return self.getOrDefault(self.splitsArray)
+
 
 @inherit_doc
 class CountVectorizer(JavaEstimator, HasInputCol, HasOutputCol, 
JavaMLReadable, JavaMLWritable):

http://git-wip-us.apache.org/repos/asf/spark/blob/ab1b5d92/python/pyspark/ml/param/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/__init__.py 
b/python/pyspark/ml/param/__init__.py
index 043c25c..5b6b702 100644
--- a/python/pyspark/ml/param/__init__.py
+++ b/python/pyspark/ml/param/__init__.py
@@ -135,6 +135,16 @@ class TypeConverters(object):
         raise TypeError("Could not convert %s to list of floats" % value)
 
     @staticmethod
+    def toListListFloat(value):
+        """
+        Convert a value to list of list of floats, if possible.
+        """
+        if TypeConverters._can_convert_to_list(value):
+            value = TypeConverters.toList(value)
+            return [TypeConverters.toListFloat(v) for v in value]
+        raise TypeError("Could not convert %s to list of list of floats" % 
value)
+
+    @staticmethod
     def toListInt(value):
         """
         Convert a value to list of ints, if possible.

http://git-wip-us.apache.org/repos/asf/spark/blob/ab1b5d92/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 1af2b91..b8bddbd 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -238,6 +238,15 @@ class ParamTypeConversionTests(PySparkTestCase):
         self.assertRaises(TypeError, lambda: 
LogisticRegression(fitIntercept=1))
         self.assertRaises(TypeError, lambda: 
LogisticRegression(fitIntercept="false"))
 
+    def test_list_list_float(self):
+        b = Bucketizer(splitsArray=[[-0.1, 0.5, 3], [-5, 1.5]])
+        self.assertEqual(b.getSplitsArray(), [[-0.1, 0.5, 3.0], [-5.0, 1.5]])
+        self.assertTrue(all([type(v) == list for v in b.getSplitsArray()]))
+        self.assertTrue(all([type(v) == float for v in b.getSplitsArray()[0]]))
+        self.assertTrue(all([type(v) == float for v in b.getSplitsArray()[1]]))
+        self.assertRaises(TypeError, lambda: Bucketizer(splitsArray=["a", 
1.0]))
+        self.assertRaises(TypeError, lambda: Bucketizer(splitsArray=[[-5, 
1.5], ["a", 1.0]]))
+
 
 class PipelineTests(PySparkTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to