Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18659#discussion_r139583530
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3122,6 +3122,185 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
    +class VectorizedUDFTests(ReusedPySparkTestCase):
    +
    +    @classmethod
    +    def setUpClass(cls):
    +        ReusedPySparkTestCase.setUpClass()
    +        cls.spark = SparkSession(cls.sc)
    +
    +    @classmethod
    +    def tearDownClass(cls):
    +        ReusedPySparkTestCase.tearDownClass()
    +        cls.spark.stop()
    +
    +    def test_vectorized_udf_basic(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        df = self.spark.range(10).select(
    +            col('id').cast('string').alias('str'),
    +            col('id').cast('int').alias('int'),
    +            col('id').alias('long'),
    +            col('id').cast('float').alias('float'),
    +            col('id').cast('double').alias('double'),
    +            col('id').cast('boolean').alias('bool'))
    +        f = lambda x: x
    +        str_f = pandas_udf(f, StringType())
    +        int_f = pandas_udf(f, IntegerType())
    +        long_f = pandas_udf(f, LongType())
    +        float_f = pandas_udf(f, FloatType())
    +        double_f = pandas_udf(f, DoubleType())
    +        bool_f = pandas_udf(f, BooleanType())
    +        res = df.select(str_f(col('str')), int_f(col('int')),
    +                        long_f(col('long')), float_f(col('float')),
    +                        double_f(col('double')), bool_f(col('bool')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_null_boolean(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        data = [(True,), (True,), (None,), (False,)]
    +        schema = StructType().add("bool", BooleanType())
    +        df = self.spark.createDataFrame(data, schema)
    +        bool_f = pandas_udf(lambda x: x, BooleanType())
    +        res = df.select(bool_f(col('bool')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_null_byte(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        data = [(None,), (2,), (3,), (4,)]
    +        schema = StructType().add("byte", ByteType())
    +        df = self.spark.createDataFrame(data, schema)
    +        byte_f = pandas_udf(lambda x: x, ByteType())
    +        res = df.select(byte_f(col('byte')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_null_short(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        data = [(None,), (2,), (3,), (4,)]
    +        schema = StructType().add("short", ShortType())
    +        df = self.spark.createDataFrame(data, schema)
    +        short_f = pandas_udf(lambda x: x, ShortType())
    +        res = df.select(short_f(col('short')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_null_int(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        data = [(None,), (2,), (3,), (4,)]
    +        schema = StructType().add("int", IntegerType())
    +        df = self.spark.createDataFrame(data, schema)
    +        int_f = pandas_udf(lambda x: x, IntegerType())
    +        res = df.select(int_f(col('int')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_null_long(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        data = [(None,), (2,), (3,), (4,)]
    +        schema = StructType().add("long", LongType())
    +        df = self.spark.createDataFrame(data, schema)
    +        long_f = pandas_udf(lambda x: x, LongType())
    +        res = df.select(long_f(col('long')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_null_float(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        data = [(3.0,), (5.0,), (-1.0,), (None,)]
    +        schema = StructType().add("float", FloatType())
    +        df = self.spark.createDataFrame(data, schema)
    +        float_f = pandas_udf(lambda x: x, FloatType())
    +        res = df.select(float_f(col('float')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_null_double(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        data = [(3.0,), (5.0,), (-1.0,), (None,)]
    +        schema = StructType().add("double", DoubleType())
    +        df = self.spark.createDataFrame(data, schema)
    +        double_f = pandas_udf(lambda x: x, DoubleType())
    +        res = df.select(double_f(col('double')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_null_string(self):
    +        from pyspark.sql.functions import pandas_udf, col
    +        data = [("foo",), (None,), ("bar",), ("bar",)]
    +        schema = StructType().add("str", StringType())
    +        df = self.spark.createDataFrame(data, schema)
    +        str_f = pandas_udf(lambda x: x, StringType())
    +        res = df.select(str_f(col('str')))
    +        self.assertEquals(df.collect(), res.collect())
    +
    +    def test_vectorized_udf_zero_parameter(self):
    +        from pyspark.sql.functions import pandas_udf
    +        import pandas as pd
    +        df = self.spark.range(100000)
    --- End diff --
    
    I think it's fine to reduce the range.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to